Started moving some core classes and functions from plugable.py to new base.py module

This commit is contained in:
Jason Gerard DeRose 2008-12-30 00:45:48 -07:00
parent e14fc84dfc
commit 447c88a2bb
5 changed files with 370 additions and 30 deletions

172
ipalib/base.py Normal file
View File

@ -0,0 +1,172 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Low-level functions and abstract base classes.
"""
import re
from constants import NAME_REGEX, NAME_ERROR
from constants import TYPE_ERROR, SET_ERROR, DEL_ERROR
class ReadOnly(object):
"""
Base class for classes that can be locked into a read-only state.
Be forewarned that Python does not offer true read-only attributes for
user-defined classes. Do *not* rely upon the read-only-ness of this
class for security purposes!
The point of this class is not to make it impossible to set or to delete
attributes after an instance is locked, but to make it impossible to do so
*accidentally*. Rather than simply telling our programmers something like,
"Don't set any attributes on this ``FooBar`` instance because doing so wont
be thread-safe", this class gives us a way to enforce it.
For example, before a `ReadOnly` instance is locked, you can set and delete
its attributes as normal:
>>> class Person(ReadOnly):
... pass
...
>>> p = Person()
>>> p.__islocked__() # Initially unlocked
False
>>> p.name = 'John Doe'
>>> p.phone = '123-456-7890'
>>> del p.phone
But after an instance is locked, you cannot set its attributes:
>>> p.__lock__() # This will lock the instance
>>> p.__islocked__()
True
>>> p.department = 'Engineering'
Traceback (most recent call last):
...
AttributeError: locked: cannot set Person.department to 'Engineering'
Nor can you deleted its attributes:
>>> del p.name
Traceback (most recent call last):
...
AttributeError: locked: cannot delete Person.name
However, as noted above, there are still obscure ways in which attributes
can be set or deleted on a locked `ReadOnly` instance. For example:
>>> object.__setattr__(p, 'department', 'Engineering')
>>> p.department
'Engineering'
>>> object.__delattr__(p, 'name')
>>> hasattr(p, 'name')
False
But again, the point is that a programmer would never employ the above
techniques as a mere accident.
"""
__locked = False
def __lock__(self):
"""
Put this instance into a read-only state.
After the instance has been locked, attempting to set or delete an
attribute will raise an AttributeError.
"""
assert self.__locked is False, '__lock__() can only be called once'
self.__locked = True
def __islocked__(self):
"""
Return True if instance is locked, otherwise False.
"""
return self.__locked
def __setattr__(self, name, value):
"""
If unlocked, set attribute named ``name`` to ``value``.
If this instance is locked, an AttributeError will be raised.
:param name: Name of attribute to set.
:param value: Value to assign to attribute.
"""
if self.__locked:
raise AttributeError(
SET_ERROR % (self.__class__.__name__, name, value)
)
return object.__setattr__(self, name, value)
def __delattr__(self, name):
"""
If unlocked, delete attribute named ``name``.
If this instance is locked, an AttributeError will be raised.
:param name: Name of attribute to delete.
"""
if self.__locked:
raise AttributeError(
DEL_ERROR % (self.__class__.__name__, name)
)
return object.__delattr__(self, name)
def check_name(name):
"""
Verify that ``name`` is suitable for a `NameSpace` member name.
This function will raise a ``ValueError`` if ``name`` does not match the
`constants.NAME_REGEX` regular expression. For example:
>>> check_name('MyName')
Traceback (most recent call last):
...
ValueError: name must match '^[a-z][_a-z0-9]*[a-z0-9]$'; got 'MyName'
Also, this function will raise a ``TypeError`` if ``name`` is not an
``str`` instance. For example:
>>> check_name(u'name')
Traceback (most recent call last):
...
TypeError: name: need a <type 'str'>; got u'name' (a <type 'unicode'>)
So that `check_name()` can be easily used within an assignment, ``name``
is returned unchanged if it passes the check. For example:
>>> n = check_name('name')
>>> n
'name'
:param name: Identifier to test.
"""
if type(name) is not str:
raise TypeError(
TYPE_ERROR % ('name', str, name, type(name))
)
if re.match(NAME_REGEX, name) is None:
raise ValueError(
NAME_ERROR % (NAME_REGEX, name)
)
return name

View File

@ -30,6 +30,8 @@ from types import NoneType
import os
from os import path
import sys
from base import check_name
from constants import CONFIG_SECTION
from constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR
@ -204,11 +206,11 @@ class Env(object):
"""
Set ``key`` to ``value``.
"""
# FIXME: the key should be checked with check_name()
if self.__locked:
raise AttributeError(
SET_ERROR % (self.__class__.__name__, key, value)
)
check_name(key)
if key in self.__d:
raise AttributeError(OVERRIDE_ERROR %
(self.__class__.__name__, key, self.__d[key], value)
@ -263,6 +265,56 @@ class Env(object):
for key in sorted(self.__d):
yield key
def _merge(self, **kw):
"""
Merge variables in ``kw`` into environment.
Any variables in ``kw`` that have already been set will be skipped
(which means this method will not try to override them).
This method returns a (set, total) tuple contained the number of
variables actually set and the number of variables requested to be set.
For example:
>>> env = Env()
>>> env._merge(first=1, second=2)
(2, 2)
>>> env._merge(first=1, third=3)
(1, 2)
>>> env._merge(first=1, second=2, third=3)
(0, 3)
"""
i = 0
for (key, value) in kw.iteritems():
if key not in self:
self[key] = value
i += 1
return (i, len(kw))
def _merge_from_file(self, conf_file):
"""
Merge values from ``conf_file`` into this `Env`.
"""
if not path.isfile(conf_file):
return
parser = RawConfigParser()
try:
parser.read(conf_file)
except ParsingError:
return
if not parser.has_section(CONFIG_SECTION):
parser.add_section(CONFIG_SECTION)
items = parser.items(CONFIG_SECTION)
if len(items) == 0:
return
i = 0
for (key, value) in items:
if key not in self:
self[key] = value
i += 1
return (i, len(items))
def __doing(self, name):
if name in self.__done:
raise StandardError(
@ -344,9 +396,7 @@ class Env(object):
self.log = path.join(self.dot_ipa, 'log', name)
else:
self.log = path.join('/', 'var', 'log', 'ipa', name)
for (key, value) in defaults.iteritems():
if key not in self:
self[key] = value
self._merge(**defaults)
def _finalize(self, **lastchance):
"""
@ -370,26 +420,3 @@ class Env(object):
if key not in self:
self[key] = value
self.__lock__()
def _merge_from_file(self, conf_file):
"""
Merge values from ``conf_file`` into this `Env`.
"""
if not path.isfile(conf_file):
return
parser = RawConfigParser()
try:
parser.read(conf_file)
except ParsingError:
return
if not parser.has_section(CONFIG_SECTION):
parser.add_section(CONFIG_SECTION)
items = parser.items(CONFIG_SECTION)
if len(items) == 0:
return
i = 0
for (key, value) in items:
if key not in self:
self[key] = value
i += 1
return (i, len(items))

View File

@ -25,8 +25,14 @@ All constants centralised in one file.
# The parameter system treats all these values as None:
NULLS = (None, '', u'', tuple(), [])
# regular expression NameSpace member names must match:
NAME_REGEX = r'^[a-z][_a-z0-9]*[a-z0-9]$'
# Format for ValueError raised when name does not match above regex:
NAME_ERROR = 'name must match %r; got %r'
# Standard format for TypeError message:
TYPE_ERROR = '%s: need a %r; got %r (which is a %r)'
TYPE_ERROR = '%s: need a %r; got %r (a %r)'
# Stardard format for TypeError message when a callable is expected:
CALLABLE_ERROR = '%s: need a callable; got %r (which is a %r)'
@ -37,7 +43,7 @@ OVERRIDE_ERROR = 'cannot override %s.%s value %r with %r'
# Standard format for AttributeError message when a read-only attribute is
# already locked:
SET_ERROR = 'locked: cannot set %s.%s to %r'
DEL_ERROR = 'locked: cannot del %s.%s'
DEL_ERROR = 'locked: cannot delete %s.%s'
# Used for a tab (or indentation level) when formatting for CLI:
CLI_TAB = ' ' # Two spaces

View File

@ -0,0 +1,116 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test the `ipalib.base` module.
"""
from tests.util import ClassChecker, raises
from ipalib.constants import NAME_REGEX
from ipalib.constants import TYPE_ERROR, SET_ERROR, DEL_ERROR
from ipalib import base
class test_ReadOnly(ClassChecker):
"""
Test the `ipalib.base.ReadOnly` class
"""
_cls = base.ReadOnly
def test_lock(self):
"""
Test the `ipalib.base.ReadOnly.__lock__` method.
"""
o = self.cls()
assert o._ReadOnly__locked is False
o.__lock__()
assert o._ReadOnly__locked is True
e = raises(AssertionError, o.__lock__) # Can only be locked once
assert str(e) == '__lock__() can only be called once'
assert o._ReadOnly__locked is True # This should still be True
def test_islocked(self):
"""
Test the `ipalib.base.ReadOnly.__islocked__` method.
"""
o = self.cls()
assert o.__islocked__() is False
o.__lock__()
assert o.__islocked__() is True
def test_setattr(self):
"""
Test the `ipalib.base.ReadOnly.__setattr__` method.
"""
o = self.cls()
o.attr1 = 'Hello, world!'
assert o.attr1 == 'Hello, world!'
o.__lock__()
for name in ('attr1', 'attr2'):
e = raises(AttributeError, setattr, o, name, 'whatever')
assert str(e) == SET_ERROR % ('ReadOnly', name, 'whatever')
assert o.attr1 == 'Hello, world!'
def test_delattr(self):
"""
Test the `ipalib.base.ReadOnly.__delattr__` method.
"""
o = self.cls()
o.attr1 = 'Hello, world!'
o.attr2 = 'How are you?'
assert o.attr1 == 'Hello, world!'
assert o.attr2 == 'How are you?'
del o.attr1
assert not hasattr(o, 'attr1')
o.__lock__()
e = raises(AttributeError, delattr, o, 'attr2')
assert str(e) == DEL_ERROR % ('ReadOnly', 'attr2')
assert o.attr2 == 'How are you?'
def test_check_name():
"""
Test the `ipalib.base.check_name` function.
"""
f = base.check_name
okay = [
'user_add',
'stuff2junk',
'sixty9',
]
nope = [
'_user_add',
'__user_add',
'user_add_',
'user_add__',
'_user_add_',
'__user_add__',
'60nine',
]
for name in okay:
assert name is f(name)
e = raises(TypeError, f, unicode(name))
assert str(e) == TYPE_ERROR % ('name', str, unicode(name), unicode)
error = 'name must match %r; got %r'
for name in nope:
e = raises(ValueError, f, name)
assert str(e) == error % (NAME_REGEX, name)
for name in okay:
e = raises(ValueError, f, name.upper())
assert str(e) == error % (NAME_REGEX, name.upper())

View File

@ -21,7 +21,6 @@
Test the `ipalib.config` module.
"""
import types
import os
from os import path
import sys
@ -29,6 +28,7 @@ from tests.util import raises, setitem, delitem, ClassChecker
from tests.util import getitem, setitem, delitem
from tests.util import TempDir, TempHome
from ipalib.constants import TYPE_ERROR, OVERRIDE_ERROR, SET_ERROR, DEL_ERROR
from ipalib.constants import NAME_REGEX, NAME_ERROR
from ipalib import config, constants
@ -56,6 +56,13 @@ good_vars = (
)
bad_names = (
('CamelCase', 'value'),
('_leading_underscore', 'value'),
('trailing_underscore_', 'value'),
)
# Random base64-encoded data to simulate a misbehaving config file.
config_bad = """
/9j/4AAQSkZJRgABAQEAlgCWAAD//gAIT2xpdmVy/9sAQwAQCwwODAoQDg0OEhEQExgoGhgWFhgx
@ -179,6 +186,12 @@ class test_Env(ClassChecker):
e = raises(AttributeError, setattr, o, name, raw)
assert str(e) == SET_ERROR % ('Env', name, raw)
# Test that name is tested with check_name():
o = self.cls()
for (name, value) in bad_names:
e = raises(ValueError, setattr, o, name, value)
assert str(e) == NAME_ERROR % (NAME_REGEX, name)
def test_setitem(self):
"""
Test the `ipalib.config.Env.__setitem__` method.
@ -203,6 +216,12 @@ class test_Env(ClassChecker):
e = raises(AttributeError, o.__setitem__, key, raw)
assert str(e) == SET_ERROR % ('Env', key, raw)
# Test that name is tested with check_name():
o = self.cls()
for (key, value) in bad_names:
e = raises(ValueError, o.__setitem__, key, value)
assert str(e) == NAME_ERROR % (NAME_REGEX, key)
def test_getitem(self):
"""
Test the `ipalib.config.Env.__getitem__` method.