81: Switch from tab to 4-space indentation

This commit is contained in:
Jason Gerard DeRose 2008-08-08 17:11:29 +00:00
parent f656e31a7e
commit 8e46824815
10 changed files with 1279 additions and 1279 deletions

194
ipa
View File

@ -31,54 +31,54 @@ from ipalib.startup import api
TAB_WIDTH = 2
def _(msg):
"""
Dummy gettext function for testing.
"""
return msg
"""
Dummy gettext function for testing.
"""
return msg
class row(object):
def __init__(self, tab, c1, c2=None):
assert type(tab) is int
assert type(c1) in (str, int)
assert type(c2) is str or c2 is None
self.tab = tab
self.c1 = c1
self.c2 = c2
def __init__(self, tab, c1, c2=None):
assert type(tab) is int
assert type(c1) in (str, int)
assert type(c2) is str or c2 is None
self.tab = tab
self.c1 = c1
self.c2 = c2
def __len__(self):
return len(str(self.c1))
def __len__(self):
return len(str(self.c1))
def pretty_print(self, just):
tab = ' ' * (self.tab * TAB_WIDTH)
if self.c2 is None:
print '%s%s' % (tab, self.c1)
else:
if type(self.c1) is int:
c1 = str(self.c1).rjust(just)
else:
c1 = self.c1.ljust(just)
print '%s%s %s' % (tab, c1, self.c2)
def pretty_print(self, just):
tab = ' ' * (self.tab * TAB_WIDTH)
if self.c2 is None:
print '%s%s' % (tab, self.c1)
else:
if type(self.c1) is int:
c1 = str(self.c1).rjust(just)
else:
c1 = self.c1.ljust(just)
print '%s%s %s' % (tab, c1, self.c2)
def pretty_print(rows):
rows = tuple(rows)
def get_lengths():
yield 0
for r in rows:
if r.c2 is not None:
yield len(r)
max_len = max(get_lengths())
for r in rows:
r.pretty_print(max_len)
rows = tuple(rows)
def get_lengths():
yield 0
for r in rows:
if r.c2 is not None:
yield len(r)
max_len = max(get_lengths())
for r in rows:
r.pretty_print(max_len)
def print_commands():
print 'Commands:'
m = api.max_cmd_len
for cmd in api.cmd:
print ' %s %s' % (str(cmd).ljust(m), cmd.get_doc(_))
print 'Commands:'
m = api.max_cmd_len
for cmd in api.cmd:
print ' %s %s' % (str(cmd).ljust(m), cmd.get_doc(_))
def print_help(cmd):
print 'Help on %s' % cmd
print 'Help on %s' % cmd
@ -94,79 +94,79 @@ def print_help(cmd):
def print_api():
def iter_api(tab):
for name in api:
ns = getattr(api, name)
yield row(
tab,
name,
repr(ns),
)
for i in ns:
yield row(
tab + 1,
i.name,
repr(i)
)
def iter_api(tab):
for name in api:
ns = getattr(api, name)
yield row(
tab,
name,
repr(ns),
)
for i in ns:
yield row(
tab + 1,
i.name,
repr(i)
)
def iter_obj(tab):
for obj in api.obj:
yield row(
tab,
obj.name,
repr(obj),
)
for (n, f) in [('mthd', '.%s()'), ('prop', '.%s')]:
ns = getattr(obj, n)
yield row(
tab + 1,
n,
repr(ns),
)
for attr in ns:
yield row(
tab + 2,
f % attr.name,
repr(attr),
)
def iter_obj(tab):
for obj in api.obj:
yield row(
tab,
obj.name,
repr(obj),
)
for (n, f) in [('mthd', '.%s()'), ('prop', '.%s')]:
ns = getattr(obj, n)
yield row(
tab + 1,
n,
repr(ns),
)
for attr in ns:
yield row(
tab + 2,
f % attr.name,
repr(attr),
)
def iter_summary(tab):
for name in api:
ns = getattr(api, name)
yield row(
tab,
len(ns),
name
)
def iter_summary(tab):
for name in api:
ns = getattr(api, name)
yield row(
tab,
len(ns),
name
)
def print_heading(h):
print '\n%s:' % h
print '-' * (len(h) + 1)
def print_heading(h):
print '\n%s:' % h
print '-' * (len(h) + 1)
tab = 1
print_heading('API Overview')
pretty_print(iter_api(tab))
tab = 1
print_heading('API Overview')
pretty_print(iter_api(tab))
print_heading('Object Details')
pretty_print(iter_obj(tab))
print_heading('Object Details')
pretty_print(iter_obj(tab))
print_heading('Summary')
pretty_print(iter_summary(tab))
print_heading('Summary')
pretty_print(iter_summary(tab))
if len(sys.argv) < 2:
print_commands()
print 'Usage: ipa COMMAND [OPTIONS]'
sys.exit(2)
print_commands()
print 'Usage: ipa COMMAND [OPTIONS]'
sys.exit(2)
name= sys.argv[1]
if name == '_api_':
print_api()
sys.exit()
print_api()
sys.exit()
elif name not in api.cmd:
print_commands()
print 'ipa: ERROR: unknown command %r' % name
sys.exit(2)
print_commands()
print 'ipa: ERROR: unknown command %r' % name
sys.exit(2)
api.cmd[name]()

View File

@ -22,128 +22,128 @@ All custom errors raised by `ipalib` package.
"""
class IPAError(Exception):
"""
Use this base class for your custom IPA errors unless there is a
specific reason to subclass from AttributeError, KeyError, etc.
"""
msg = None
"""
Use this base class for your custom IPA errors unless there is a
specific reason to subclass from AttributeError, KeyError, etc.
"""
msg = None
def __init__(self, *args, **kw):
self.args = args
self.kw = kw
def __init__(self, *args, **kw):
self.args = args
self.kw = kw
def __str__(self):
"""
Returns the string representation of this exception.
"""
if self.msg is None:
if len(self.args) == 1:
return unicode(self.args[0])
return unicode(self.args)
if len(self.args) > 0:
return self.msg % self.args
return self.msg % self.kw
def __str__(self):
"""
Returns the string representation of this exception.
"""
if self.msg is None:
if len(self.args) == 1:
return unicode(self.args[0])
return unicode(self.args)
if len(self.args) > 0:
return self.msg % self.args
return self.msg % self.kw
class ValidationError(IPAError):
msg = 'invalid %r value %r: %s'
msg = 'invalid %r value %r: %s'
def __init__(self, name, value, error):
self.name = name
self.value = value
self.error = error
super(ValidationError, self).__init__(name, value, error)
def __init__(self, name, value, error):
self.name = name
self.value = value
self.error = error
super(ValidationError, self).__init__(name, value, error)
class NormalizationError(ValidationError):
def __init__(self, name, value, type):
self.type = type
super(NormalizationError, self).__init__(name, value,
'not %r' % type
)
def __init__(self, name, value, type):
self.type = type
super(NormalizationError, self).__init__(name, value,
'not %r' % type
)
class RuleError(ValidationError):
def __init__(self, name, value, rule, error):
self.rule = rule
super(RuleError, self).__init__(name, value, error)
def __init__(self, name, value, rule, error):
self.rule = rule
super(RuleError, self).__init__(name, value, error)
class SetError(IPAError):
msg = 'setting %r, but NameSpace does not allow attribute setting'
msg = 'setting %r, but NameSpace does not allow attribute setting'
class RegistrationError(IPAError):
"""
Base class for errors that occur during plugin registration.
"""
"""
Base class for errors that occur during plugin registration.
"""
class NameSpaceError(RegistrationError):
msg = 'name %r does not re.match %r'
msg = 'name %r does not re.match %r'
class SubclassError(RegistrationError):
"""
Raised when registering a plugin that is not a subclass of one of the
allowed bases.
"""
msg = 'plugin %r not subclass of any base in %r'
"""
Raised when registering a plugin that is not a subclass of one of the
allowed bases.
"""
msg = 'plugin %r not subclass of any base in %r'
def __init__(self, cls, allowed):
self.cls = cls
self.allowed = allowed
def __init__(self, cls, allowed):
self.cls = cls
self.allowed = allowed
def __str__(self):
return self.msg % (self.cls, self.allowed)
def __str__(self):
return self.msg % (self.cls, self.allowed)
class DuplicateError(RegistrationError):
"""
Raised when registering a plugin whose exact class has already been
registered.
"""
msg = '%r at %d was already registered'
"""
Raised when registering a plugin whose exact class has already been
registered.
"""
msg = '%r at %d was already registered'
def __init__(self, cls):
self.cls = cls
def __init__(self, cls):
self.cls = cls
def __str__(self):
return self.msg % (self.cls, id(self.cls))
def __str__(self):
return self.msg % (self.cls, id(self.cls))
class OverrideError(RegistrationError):
"""
Raised when override=False yet registering a plugin that overrides an
existing plugin in the same namespace.
"""
msg = 'unexpected override of %s.%s with %r (use override=True if intended)'
"""
Raised when override=False yet registering a plugin that overrides an
existing plugin in the same namespace.
"""
msg = 'unexpected override of %s.%s with %r (use override=True if intended)'
def __init__(self, base, cls):
self.base = base
self.cls = cls
def __init__(self, base, cls):
self.base = base
self.cls = cls
def __str__(self):
return self.msg % (self.base.__name__, self.cls.__name__, self.cls)
def __str__(self):
return self.msg % (self.base.__name__, self.cls.__name__, self.cls)
class MissingOverrideError(RegistrationError):
"""
Raised when override=True yet no preexisting plugin with the same name
and base has been registered.
"""
msg = '%s.%s has not been registered, cannot override with %r'
"""
Raised when override=True yet no preexisting plugin with the same name
and base has been registered.
"""
msg = '%s.%s has not been registered, cannot override with %r'
def __init__(self, base, cls):
self.base = base
self.cls = cls
def __init__(self, base, cls):
self.base = base
self.cls = cls
def __str__(self):
return self.msg % (self.base.__name__, self.cls.__name__, self.cls)
def __str__(self):
return self.msg % (self.base.__name__, self.cls.__name__, self.cls)
class TwiceSetError(IPAError):
msg = '%s.%s cannot be set twice'
msg = '%s.%s cannot be set twice'

View File

@ -5,50 +5,50 @@
def get_label(self, _):
return _('Title') # Enum?
def get_label(self, _):
return _('Title') # Enum?
def get_label(self, _):
return _('First Name')
def get_label(self, _):
return _('First Name')
def get_label(self, _):
return _('Last Name')
def get_label(self, _):
return _('Last Name')
def get_label(self, _):
return _('Full Name') # Autofill
def get_label(self, _):
return _('Full Name') # Autofill
def get_label(self, _):
return _('Display Name') # Autofill
def get_label(self, _):
return _('Display Name') # Autofill
def get_label(self, _):
return _('Initials') # generated/ro?
def get_label(self, _):
return _('Initials') # generated/ro?
def get_label(self, _):
return _('Account Status') # Enum (active, inactive)
def get_label(self, _):
return _('Account Status') # Enum (active, inactive)
def get_label(self, _):
return _('Login')
def get_label(self, _):
return _('Login')
def get_label(self, _):
return _('Password')
def get_label(self, _):
return _('Password')
def get_label(self, _): # Same field as above, special interface
return _('Confirm Password')
def get_label(self, _): # Same field as above, special interface
return _('Confirm Password')
def get_label(self, _):
return _('UID') #ro
def get_label(self, _):
return _('UID') #ro
def get_label(self, _):
return _('GID') #ro
def get_label(self, _):
return _('GID') #ro
def get_label(self, _):
return _('Home Directory') #ro
def get_label(self, _):
return _('Home Directory') #ro
def get_label(self, _):
return _('Login Shell')
def get_label(self, _):
return _('Login Shell')
def get_label(self, _):
return _('GECOS')
def get_label(self, _):
return _('GECOS')
def get_label(self, _):
return _('')
def get_label(self, _):
return _('')

View File

@ -27,323 +27,323 @@ import errors
def to_cli(name):
"""
Takes a Python identifier and transforms it into form suitable for the
Command Line Interface.
"""
assert isinstance(name, str)
return name.replace('_', '-')
"""
Takes a Python identifier and transforms it into form suitable for the
Command Line Interface.
"""
assert isinstance(name, str)
return name.replace('_', '-')
def from_cli(cli_name):
"""
Takes a string from the Command Line Interface and transforms it into a
Python identifier.
"""
assert isinstance(cli_name, basestring)
return cli_name.replace('-', '_')
"""
Takes a string from the Command Line Interface and transforms it into a
Python identifier.
"""
assert isinstance(cli_name, basestring)
return cli_name.replace('-', '_')
def check_identifier(name):
"""
Raises errors.NameSpaceError if `name` is not a valid Python identifier
suitable for use in a NameSpace.
"""
regex = r'^[a-z][_a-z0-9]*[a-z0-9]$'
if re.match(regex, name) is None:
raise errors.NameSpaceError(name, regex)
"""
Raises errors.NameSpaceError if `name` is not a valid Python identifier
suitable for use in a NameSpace.
"""
regex = r'^[a-z][_a-z0-9]*[a-z0-9]$'
if re.match(regex, name) is None:
raise errors.NameSpaceError(name, regex)
class Plugin(object):
"""
Base class for all plugins.
"""
"""
Base class for all plugins.
"""
__api = None
__api = None
def __get_api(self):
"""
Returns the plugable.API instance passed to Plugin.finalize(), or
or returns None if finalize() has not yet been called.
"""
return self.__api
api = property(__get_api)
def __get_api(self):
"""
Returns the plugable.API instance passed to Plugin.finalize(), or
or returns None if finalize() has not yet been called.
"""
return self.__api
api = property(__get_api)
def finalize(self, api):
"""
After all the plugins are instantiated, the plugable.API calls this
method, passing itself as the only argument. This is where plugins
should check that other plugins they depend upon have actually be
loaded.
"""
assert self.__api is None, 'finalize() can only be called once'
assert api is not None, 'finalize() argument cannot be None'
self.__api = api
def finalize(self, api):
"""
After all the plugins are instantiated, the plugable.API calls this
method, passing itself as the only argument. This is where plugins
should check that other plugins they depend upon have actually be
loaded.
"""
assert self.__api is None, 'finalize() can only be called once'
assert api is not None, 'finalize() argument cannot be None'
self.__api = api
def __get_name(self):
"""
Returns the class name of this instance.
"""
return self.__class__.__name__
name = property(__get_name)
def __get_name(self):
"""
Returns the class name of this instance.
"""
return self.__class__.__name__
name = property(__get_name)
def __repr__(self):
"""
Returns a fully qualified <module><name> representation of the class.
"""
return '%s.%s()' % (
self.__class__.__module__,
self.__class__.__name__
)
def __repr__(self):
"""
Returns a fully qualified <module><name> representation of the class.
"""
return '%s.%s()' % (
self.__class__.__module__,
self.__class__.__name__
)
class ReadOnly(object):
"""
Base class for classes with read-only attributes.
"""
__slots__ = tuple()
"""
Base class for classes with read-only attributes.
"""
__slots__ = tuple()
def __setattr__(self, name, value):
"""
This raises an AttributeError anytime an attempt is made to set an
attribute.
"""
raise AttributeError('read-only: cannot set %s.%s' %
(self.__class__.__name__, name)
)
def __setattr__(self, name, value):
"""
This raises an AttributeError anytime an attempt is made to set an
attribute.
"""
raise AttributeError('read-only: cannot set %s.%s' %
(self.__class__.__name__, name)
)
def __delattr__(self, name):
"""
This raises an AttributeError anytime an attempt is made to delete an
attribute.
"""
raise AttributeError('read-only: cannot del %s.%s' %
(self.__class__.__name__, name)
)
def __delattr__(self, name):
"""
This raises an AttributeError anytime an attempt is made to delete an
attribute.
"""
raise AttributeError('read-only: cannot del %s.%s' %
(self.__class__.__name__, name)
)
class Proxy(ReadOnly):
__slots__ = (
'__base',
'__target',
'__name_attr',
'__public',
'name',
)
__slots__ = (
'__base',
'__target',
'__name_attr',
'__public',
'name',
)
def __init__(self, base, target, name_attr='name'):
if not inspect.isclass(base):
raise TypeError('arg1 must be a class, got %r' % base)
if not isinstance(target, base):
raise ValueError('arg2 must be instance of arg1, got %r' % target)
object.__setattr__(self, '_Proxy__base', base)
object.__setattr__(self, '_Proxy__target', target)
object.__setattr__(self, '_Proxy__name_attr', name_attr)
object.__setattr__(self, '_Proxy__public', base.__public__)
object.__setattr__(self, 'name', getattr(target, name_attr))
def __init__(self, base, target, name_attr='name'):
if not inspect.isclass(base):
raise TypeError('arg1 must be a class, got %r' % base)
if not isinstance(target, base):
raise ValueError('arg2 must be instance of arg1, got %r' % target)
object.__setattr__(self, '_Proxy__base', base)
object.__setattr__(self, '_Proxy__target', target)
object.__setattr__(self, '_Proxy__name_attr', name_attr)
object.__setattr__(self, '_Proxy__public', base.__public__)
object.__setattr__(self, 'name', getattr(target, name_attr))
# Check __public
assert type(self.__public) is frozenset
# Check __public
assert type(self.__public) is frozenset
# Check name
check_identifier(self.name)
# Check name
check_identifier(self.name)
def __iter__(self):
for name in sorted(self.__public):
yield name
def __iter__(self):
for name in sorted(self.__public):
yield name
def __getitem__(self, key):
if key in self.__public:
return getattr(self.__target, key)
raise KeyError('no proxy attribute %r' % key)
def __getitem__(self, key):
if key in self.__public:
return getattr(self.__target, key)
raise KeyError('no proxy attribute %r' % key)
def __getattr__(self, name):
if name in self.__public:
return getattr(self.__target, name)
raise AttributeError('no proxy attribute %r' % name)
def __getattr__(self, name):
if name in self.__public:
return getattr(self.__target, name)
raise AttributeError('no proxy attribute %r' % name)
def __call__(self, *args, **kw):
return self['__call__'](*args, **kw)
def __call__(self, *args, **kw):
return self['__call__'](*args, **kw)
def _clone(self, name_attr):
return self.__class__(self.__base, self.__target, name_attr)
def _clone(self, name_attr):
return self.__class__(self.__base, self.__target, name_attr)
def __repr__(self):
return '%s(%s, %r, %r)' % (
self.__class__.__name__,
self.__base.__name__,
self.__target,
self.__name_attr,
)
def __repr__(self):
return '%s(%s, %r, %r)' % (
self.__class__.__name__,
self.__base.__name__,
self.__target,
self.__name_attr,
)
class NameSpace(ReadOnly):
"""
A read-only namespace of (key, value) pairs that can be accessed
both as instance attributes and as dictionary items.
"""
"""
A read-only namespace of (key, value) pairs that can be accessed
both as instance attributes and as dictionary items.
"""
def __init__(self, proxies):
"""
NameSpace
"""
object.__setattr__(self, '_NameSpace__proxies', tuple(proxies))
object.__setattr__(self, '_NameSpace__d', dict())
for proxy in self.__proxies:
assert isinstance(proxy, Proxy)
assert proxy.name not in self.__d
self.__d[proxy.name] = proxy
assert not hasattr(self, proxy.name)
object.__setattr__(self, proxy.name, proxy)
def __init__(self, proxies):
"""
NameSpace
"""
object.__setattr__(self, '_NameSpace__proxies', tuple(proxies))
object.__setattr__(self, '_NameSpace__d', dict())
for proxy in self.__proxies:
assert isinstance(proxy, Proxy)
assert proxy.name not in self.__d
self.__d[proxy.name] = proxy
assert not hasattr(self, proxy.name)
object.__setattr__(self, proxy.name, proxy)
def __iter__(self):
"""
Iterates through the proxies in this NameSpace in the same order they
were passed in the contructor.
"""
for proxy in self.__proxies:
yield proxy
def __iter__(self):
"""
Iterates through the proxies in this NameSpace in the same order they
were passed in the contructor.
"""
for proxy in self.__proxies:
yield proxy
def __len__(self):
"""
Returns number of proxies in this NameSpace.
"""
return len(self.__proxies)
def __len__(self):
"""
Returns number of proxies in this NameSpace.
"""
return len(self.__proxies)
def __contains__(self, key):
"""
Returns True if a proxy named `key` is in this NameSpace.
"""
return key in self.__d
def __contains__(self, key):
"""
Returns True if a proxy named `key` is in this NameSpace.
"""
return key in self.__d
def __getitem__(self, key):
"""
Returns proxy named `key`; otherwise raises KeyError.
"""
if key in self.__d:
return self.__d[key]
raise KeyError('NameSpace has no item for key %r' % key)
def __getitem__(self, key):
"""
Returns proxy named `key`; otherwise raises KeyError.
"""
if key in self.__d:
return self.__d[key]
raise KeyError('NameSpace has no item for key %r' % key)
def __repr__(self):
return '%s(<%d proxies>)' % (self.__class__.__name__, len(self))
def __repr__(self):
return '%s(<%d proxies>)' % (self.__class__.__name__, len(self))
class Registrar(object):
def __init__(self, *allowed):
"""
`*allowed` is a list of the base classes plugins can be subclassed
from.
"""
self.__allowed = frozenset(allowed)
self.__d = {}
self.__registered = set()
assert len(self.__allowed) == len(allowed)
for base in self.__allowed:
assert inspect.isclass(base)
assert base.__name__ not in self.__d
self.__d[base.__name__] = {}
def __init__(self, *allowed):
"""
`*allowed` is a list of the base classes plugins can be subclassed
from.
"""
self.__allowed = frozenset(allowed)
self.__d = {}
self.__registered = set()
assert len(self.__allowed) == len(allowed)
for base in self.__allowed:
assert inspect.isclass(base)
assert base.__name__ not in self.__d
self.__d[base.__name__] = {}
def __findbase(self, cls):
"""
If `cls` is a subclass of a base in self.__allowed, returns that
base; otherwise raises SubclassError.
"""
assert inspect.isclass(cls)
found = False
for base in self.__allowed:
if issubclass(cls, base):
found = True
yield base
if not found:
raise errors.SubclassError(cls, self.__allowed)
def __findbase(self, cls):
"""
If `cls` is a subclass of a base in self.__allowed, returns that
base; otherwise raises SubclassError.
"""
assert inspect.isclass(cls)
found = False
for base in self.__allowed:
if issubclass(cls, base):
found = True
yield base
if not found:
raise errors.SubclassError(cls, self.__allowed)
def __call__(self, cls, override=False):
"""
Register the plugin `cls`.
"""
if not inspect.isclass(cls):
raise TypeError('plugin must be a class: %r' % cls)
def __call__(self, cls, override=False):
"""
Register the plugin `cls`.
"""
if not inspect.isclass(cls):
raise TypeError('plugin must be a class: %r' % cls)
# Raise DuplicateError if this exact class was already registered:
if cls in self.__registered:
raise errors.DuplicateError(cls)
# Raise DuplicateError if this exact class was already registered:
if cls in self.__registered:
raise errors.DuplicateError(cls)
# Find the base class or raise SubclassError:
for base in self.__findbase(cls):
sub_d = self.__d[base.__name__]
# Find the base class or raise SubclassError:
for base in self.__findbase(cls):
sub_d = self.__d[base.__name__]
# Check override:
if cls.__name__ in sub_d:
# Must use override=True to override:
if not override:
raise errors.OverrideError(base, cls)
else:
# There was nothing already registered to override:
if override:
raise errors.MissingOverrideError(base, cls)
# Check override:
if cls.__name__ in sub_d:
# Must use override=True to override:
if not override:
raise errors.OverrideError(base, cls)
else:
# There was nothing already registered to override:
if override:
raise errors.MissingOverrideError(base, cls)
# The plugin is okay, add to sub_d:
sub_d[cls.__name__] = cls
# The plugin is okay, add to sub_d:
sub_d[cls.__name__] = cls
# The plugin is okay, add to __registered:
self.__registered.add(cls)
# The plugin is okay, add to __registered:
self.__registered.add(cls)
def __getitem__(self, item):
"""
Returns a copy of the namespace dict of the base class named `name`.
"""
if inspect.isclass(item):
if item not in self.__allowed:
raise KeyError(repr(item))
key = item.__name__
else:
key = item
return dict(self.__d[key])
def __getitem__(self, item):
"""
Returns a copy of the namespace dict of the base class named `name`.
"""
if inspect.isclass(item):
if item not in self.__allowed:
raise KeyError(repr(item))
key = item.__name__
else:
key = item
return dict(self.__d[key])
def __contains__(self, item):
"""
Returns True if a base class named `name` is in this Registrar.
"""
if inspect.isclass(item):
return item in self.__allowed
return item in self.__d
def __contains__(self, item):
"""
Returns True if a base class named `name` is in this Registrar.
"""
if inspect.isclass(item):
return item in self.__allowed
return item in self.__d
def __iter__(self):
"""
Iterates through a (base, registered_plugins) tuple for each allowed
base.
"""
for base in self.__allowed:
sub_d = self.__d[base.__name__]
yield (base, tuple(sub_d[k] for k in sorted(sub_d)))
def __iter__(self):
"""
Iterates through a (base, registered_plugins) tuple for each allowed
base.
"""
for base in self.__allowed:
sub_d = self.__d[base.__name__]
yield (base, tuple(sub_d[k] for k in sorted(sub_d)))
class API(ReadOnly):
def __init__(self, *allowed):
keys = tuple(b.__name__ for b in allowed)
object.__setattr__(self, '_API__keys', keys)
object.__setattr__(self, 'register', Registrar(*allowed))
def __init__(self, *allowed):
keys = tuple(b.__name__ for b in allowed)
object.__setattr__(self, '_API__keys', keys)
object.__setattr__(self, 'register', Registrar(*allowed))
def __call__(self):
"""
Finalize the registration, instantiate the plugins.
"""
d = {}
def plugin_iter(base, classes):
for cls in classes:
if cls not in d:
d[cls] = cls()
plugin = d[cls]
yield Proxy(base, plugin)
def __call__(self):
"""
Finalize the registration, instantiate the plugins.
"""
d = {}
def plugin_iter(base, classes):
for cls in classes:
if cls not in d:
d[cls] = cls()
plugin = d[cls]
yield Proxy(base, plugin)
for (base, classes) in self.register:
ns = NameSpace(plugin_iter(base, classes))
assert not hasattr(self, base.__name__)
object.__setattr__(self, base.__name__, ns)
for plugin in d.values():
plugin.finalize(self)
assert plugin.api is self
for (base, classes) in self.register:
ns = NameSpace(plugin_iter(base, classes))
assert not hasattr(self, base.__name__)
object.__setattr__(self, base.__name__, ns)
for plugin in d.values():
plugin.finalize(self)
assert plugin.api is self
def __iter__(self):
for key in self.__keys:
yield key
def __iter__(self):
for key in self.__keys:
yield key

View File

@ -27,109 +27,109 @@ from run import api
# Hypothetical functional commands (not associated with any object):
class krbtest(public.cmd):
def get_doc(self, _):
return _('test your Kerberos ticket')
def get_doc(self, _):
return _('test your Kerberos ticket')
api.register(krbtest)
class discover(public.cmd):
def get_doc(self, _):
return _('discover IPA servers on network')
def get_doc(self, _):
return _('discover IPA servers on network')
api.register(discover)
# Register some methods for the 'user' object:
class user_add(public.mthd):
def get_doc(self, _):
return _('add new user')
def get_doc(self, _):
return _('add new user')
api.register(user_add)
class user_del(public.mthd):
def get_doc(self, _):
return _('delete existing user')
def get_doc(self, _):
return _('delete existing user')
api.register(user_del)
class user_mod(public.mthd):
def get_doc(self, _):
return _('edit existing user')
def get_doc(self, _):
return _('edit existing user')
api.register(user_mod)
class user_find(public.mthd):
def get_doc(self, _):
return _('search for users')
def get_doc(self, _):
return _('search for users')
api.register(user_find)
# Register some properties for the 'user' object:
class user_firstname(public.prop):
pass
pass
api.register(user_firstname)
class user_lastname(public.prop):
pass
pass
api.register(user_lastname)
class user_login(public.prop):
pass
pass
api.register(user_login)
# Register some methods for the 'group' object:
class group_add(public.mthd):
def get_doc(self, _):
return _('add new group')
def get_doc(self, _):
return _('add new group')
api.register(group_add)
class group_del(public.mthd):
def get_doc(self, _):
return _('delete existing group')
def get_doc(self, _):
return _('delete existing group')
api.register(group_del)
class group_mod(public.mthd):
def get_doc(self, _):
return _('edit existing group')
def get_doc(self, _):
return _('edit existing group')
api.register(group_mod)
class group_find(public.mthd):
def get_doc(self, _):
return _('search for groups')
def get_doc(self, _):
return _('search for groups')
api.register(group_find)
# Register some methods for the 'service' object
class service_add(public.mthd):
def get_doc(self, _):
return _('add new service')
def get_doc(self, _):
return _('add new service')
api.register(service_add)
class service_del(public.mthd):
def get_doc(self, _):
return _('delete existing service')
def get_doc(self, _):
return _('delete existing service')
api.register(service_del)
class service_mod(public.mthd):
def get_doc(self, _):
return _('edit existing service')
def get_doc(self, _):
return _('edit existing service')
api.register(service_mod)
class service_find(public.mthd):
def get_doc(self, _):
return _('search for services')
def get_doc(self, _):
return _('search for services')
api.register(service_find)
# And to emphasis that the registration order doesn't matter,
# we'll register the objects last:
class group(public.obj):
def get_doc(self, _):
return _('')
def get_doc(self, _):
return _('')
api.register(group)
class service(public.obj):
def get_doc(self, _):
return _('')
def get_doc(self, _):
return _('')
api.register(service)
class user(public.obj):
def get_doc(self, _):
return _('')
def get_doc(self, _):
return _('')
api.register(user)

View File

@ -30,255 +30,255 @@ import errors
RULE_FLAG = 'validation_rule'
def rule(obj):
assert not hasattr(obj, RULE_FLAG)
setattr(obj, RULE_FLAG, True)
return obj
assert not hasattr(obj, RULE_FLAG)
setattr(obj, RULE_FLAG, True)
return obj
def is_rule(obj):
return callable(obj) and getattr(obj, RULE_FLAG, False) is True
return callable(obj) and getattr(obj, RULE_FLAG, False) is True
class option(object):
"""
The option class represents a kw argument from a command.
"""
"""
The option class represents a kw argument from a command.
"""
__public__ = frozenset((
'normalize',
'validate',
'default',
'required',
'type',
))
__rules = None
__public__ = frozenset((
'normalize',
'validate',
'default',
'required',
'type',
))
__rules = None
# type = unicode, int, float # Set in subclass
# type = unicode, int, float # Set in subclass
def normalize(self, value):
"""
Returns the normalized form of `value`. If `value` cannot be
normalized, NormalizationError is raised, which is a subclass of
ValidationError.
def normalize(self, value):
"""
Returns the normalized form of `value`. If `value` cannot be
normalized, NormalizationError is raised, which is a subclass of
ValidationError.
The base class implementation only does type coercion, but subclasses
might do other normalization (e.g., a unicode option might strip
leading and trailing white-space).
"""
try:
return self.type(value)
except (TypeError, ValueError):
raise errors.NormalizationError(
self.__class__.__name__, value, self.type
)
The base class implementation only does type coercion, but subclasses
might do other normalization (e.g., a unicode option might strip
leading and trailing white-space).
"""
try:
return self.type(value)
except (TypeError, ValueError):
raise errors.NormalizationError(
self.__class__.__name__, value, self.type
)
def validate(self, value):
"""
Calls each validation rule and if any rule fails, raises RuleError,
which is a subclass of ValidationError.
"""
for rule in self.rules:
msg = rule(value)
if msg is not None:
raise errors.RuleError(
self.__class__.__name__,
value,
rule,
msg,
)
def validate(self, value):
"""
Calls each validation rule and if any rule fails, raises RuleError,
which is a subclass of ValidationError.
"""
for rule in self.rules:
msg = rule(value)
if msg is not None:
raise errors.RuleError(
self.__class__.__name__,
value,
rule,
msg,
)
def __get_rules(self):
"""
Returns the tuple of rule methods used for input validation. This
tuple is lazily initialized the first time the property is accessed.
"""
if self.__rules is None:
self.__rules = tuple(sorted(
self.__rules_iter(),
key=lambda f: getattr(f, '__name__'),
))
return self.__rules
rules = property(__get_rules)
def __get_rules(self):
"""
Returns the tuple of rule methods used for input validation. This
tuple is lazily initialized the first time the property is accessed.
"""
if self.__rules is None:
self.__rules = tuple(sorted(
self.__rules_iter(),
key=lambda f: getattr(f, '__name__'),
))
return self.__rules
rules = property(__get_rules)
def __rules_iter(self):
"""
Iterates through the attributes in this instance to retrieve the
methods implemented validation rules.
"""
for name in dir(self.__class__):
if name.startswith('_'):
continue
base_attr = getattr(self.__class__, name)
if is_rule(base_attr):
attr = getattr(self, name)
if is_rule(attr):
yield attr
def __rules_iter(self):
"""
Iterates through the attributes in this instance to retrieve the
methods implemented validation rules.
"""
for name in dir(self.__class__):
if name.startswith('_'):
continue
base_attr = getattr(self.__class__, name)
if is_rule(base_attr):
attr = getattr(self, name)
if is_rule(attr):
yield attr
def default(self, **kw):
"""
Returns a default or auto-completed value for this option. If no
default is available, this method should return None.
"""
return None
def default(self, **kw):
"""
Returns a default or auto-completed value for this option. If no
default is available, this method should return None.
"""
return None
class cmd(plugable.Plugin):
__public__ = frozenset((
'normalize',
'autofill',
'__call__',
'get_doc',
'opt',
__public__ = frozenset((
'normalize',
'autofill',
'__call__',
'get_doc',
'opt',
))
__opt = None
))
__opt = None
def get_doc(self, _):
"""
Returns the gettext translated doc-string for this command.
def get_doc(self, _):
"""
Returns the gettext translated doc-string for this command.
For example:
For example:
>>> def get_doc(self, _):
>>> return _('add new user')
"""
raise NotImplementedError('%s.get_doc()' % self.name)
>>> def get_doc(self, _):
>>> return _('add new user')
"""
raise NotImplementedError('%s.get_doc()' % self.name)
def get_options(self):
"""
Returns iterable with opt_proxy objects used to create the opt
NameSpace when __get_opt() is called.
"""
raise NotImplementedError('%s.get_options()' % self.name)
def get_options(self):
"""
Returns iterable with opt_proxy objects used to create the opt
NameSpace when __get_opt() is called.
"""
raise NotImplementedError('%s.get_options()' % self.name)
def __get_opt(self):
"""
Returns the NameSpace containing opt_proxy objects.
"""
if self.__opt is None:
self.__opt = plugable.NameSpace(self.get_options())
return self.__opt
opt = property(__get_opt)
def __get_opt(self):
"""
Returns the NameSpace containing opt_proxy objects.
"""
if self.__opt is None:
self.__opt = plugable.NameSpace(self.get_options())
return self.__opt
opt = property(__get_opt)
def normalize_iter(self, kw):
for (key, value) in kw.items():
if key in self.options:
yield (
key, self.options[key].normalize(value)
)
else:
yield (key, value)
def normalize_iter(self, kw):
for (key, value) in kw.items():
if key in self.options:
yield (
key, self.options[key].normalize(value)
)
else:
yield (key, value)
def normalize(self, **kw):
return dict(self.normalize_iter(kw))
def normalize(self, **kw):
return dict(self.normalize_iter(kw))
def validate(self, **kw):
for (key, value) in kw.items():
if key in self.options:
self.options.validate(value)
def validate(self, **kw):
for (key, value) in kw.items():
if key in self.options:
self.options.validate(value)
def default(self, **kw):
for opt in self.options:
if opt.name not in kw:
value = opt.default(**kw)
if value is not None:
kw[opt.name] = value
def default(self, **kw):
for opt in self.options:
if opt.name not in kw:
value = opt.default(**kw)
if value is not None:
kw[opt.name] = value
def __call__(self, **kw):
(args, kw) = self.normalize(*args, **kw)
(args, kw) = self.autofill(*args, **kw)
self.validate(*args, **kw)
self.execute(*args, **kw)
def __call__(self, **kw):
(args, kw) = self.normalize(*args, **kw)
(args, kw) = self.autofill(*args, **kw)
self.validate(*args, **kw)
self.execute(*args, **kw)
class obj(plugable.Plugin):
__public__ = frozenset((
'mthd',
'prop',
))
__mthd = None
__prop = None
__public__ = frozenset((
'mthd',
'prop',
))
__mthd = None
__prop = None
def __get_mthd(self):
return self.__mthd
mthd = property(__get_mthd)
def __get_mthd(self):
return self.__mthd
mthd = property(__get_mthd)
def __get_prop(self):
return self.__prop
prop = property(__get_prop)
def __get_prop(self):
return self.__prop
prop = property(__get_prop)
def finalize(self, api):
super(obj, self).finalize(api)
self.__mthd = self.__create_ns('mthd')
self.__prop = self.__create_ns('prop')
def finalize(self, api):
super(obj, self).finalize(api)
self.__mthd = self.__create_ns('mthd')
self.__prop = self.__create_ns('prop')
def __create_ns(self, name):
return plugable.NameSpace(self.__filter(name))
def __create_ns(self, name):
return plugable.NameSpace(self.__filter(name))
def __filter(self, name):
for i in getattr(self.api, name):
if i.obj_name == self.name:
yield i._clone('attr_name')
def __filter(self, name):
for i in getattr(self.api, name):
if i.obj_name == self.name:
yield i._clone('attr_name')
class attr(plugable.Plugin):
__obj = None
__obj = None
def __init__(self):
m = re.match('^([a-z]+)_([a-z]+)$', self.__class__.__name__)
assert m
self.__obj_name = m.group(1)
self.__attr_name = m.group(2)
def __init__(self):
m = re.match('^([a-z]+)_([a-z]+)$', self.__class__.__name__)
assert m
self.__obj_name = m.group(1)
self.__attr_name = m.group(2)
def __get_obj_name(self):
return self.__obj_name
obj_name = property(__get_obj_name)
def __get_obj_name(self):
return self.__obj_name
obj_name = property(__get_obj_name)
def __get_attr_name(self):
return self.__attr_name
attr_name = property(__get_attr_name)
def __get_attr_name(self):
return self.__attr_name
attr_name = property(__get_attr_name)
def __get_obj(self):
"""
Returns the obj instance this attribute is associated with, or None
if no association has been set.
"""
return self.__obj
obj = property(__get_obj)
def __get_obj(self):
"""
Returns the obj instance this attribute is associated with, or None
if no association has been set.
"""
return self.__obj
obj = property(__get_obj)
def finalize(self, api):
super(attr, self).finalize(api)
self.__obj = api.obj[self.obj_name]
def finalize(self, api):
super(attr, self).finalize(api)
self.__obj = api.obj[self.obj_name]
class mthd(attr, cmd):
__public__ = frozenset((
'obj',
'obj_name',
))
__public__ = frozenset((
'obj',
'obj_name',
))
class prop(attr):
__public__ = frozenset((
'obj',
'obj_name',
))
__public__ = frozenset((
'obj',
'obj_name',
))
def get_doc(self, _):
return _('prop doc')
def get_doc(self, _):
return _('prop doc')
class PublicAPI(plugable.API):
__max_cmd_len = None
__max_cmd_len = None
def __init__(self):
super(PublicAPI, self).__init__(cmd, obj, mthd, prop)
def __init__(self):
super(PublicAPI, self).__init__(cmd, obj, mthd, prop)
def __get_max_cmd_len(self):
if self.__max_cmd_len is None:
if not hasattr(self, 'cmd'):
return None
max_cmd_len = max(len(str(cmd)) for cmd in self.cmd)
object.__setattr__(self, '_PublicAPI__max_cmd_len', max_cmd_len)
return self.__max_cmd_len
max_cmd_len = property(__get_max_cmd_len)
def __get_max_cmd_len(self):
if self.__max_cmd_len is None:
if not hasattr(self, 'cmd'):
return None
max_cmd_len = max(len(str(cmd)) for cmd in self.cmd)
object.__setattr__(self, '_PublicAPI__max_cmd_len', max_cmd_len)
return self.__max_cmd_len
max_cmd_len = property(__get_max_cmd_len)

View File

@ -26,396 +26,396 @@ from ipalib import plugable, errors
def test_to_cli():
f = plugable.to_cli
assert f('initialize') == 'initialize'
assert f('user_add') == 'user-add'
f = plugable.to_cli
assert f('initialize') == 'initialize'
assert f('user_add') == 'user-add'
def test_from_cli():
f = plugable.from_cli
assert f('initialize') == 'initialize'
assert f('user-add') == 'user_add'
f = plugable.from_cli
assert f('initialize') == 'initialize'
assert f('user-add') == 'user_add'
def test_valid_identifier():
f = plugable.check_identifier
okay = [
'user_add',
'stuff2junk',
'sixty9',
]
nope = [
'_user_add',
'__user_add',
'user_add_',
'user_add__',
'_user_add_',
'__user_add__',
'60nine',
]
for name in okay:
f(name)
for name in nope:
raises(errors.NameSpaceError, f, name)
for name in okay:
raises(errors.NameSpaceError, f, name.upper())
f = plugable.check_identifier
okay = [
'user_add',
'stuff2junk',
'sixty9',
]
nope = [
'_user_add',
'__user_add',
'user_add_',
'user_add__',
'_user_add_',
'__user_add__',
'60nine',
]
for name in okay:
f(name)
for name in nope:
raises(errors.NameSpaceError, f, name)
for name in okay:
raises(errors.NameSpaceError, f, name.upper())
def test_Plugin():
cls = plugable.Plugin
assert type(cls.name) is property
cls = plugable.Plugin
assert type(cls.name) is property
api = 'the api instance'
p = plugable.Plugin()
assert read_only(p, 'name') == 'Plugin'
assert repr(p) == '%s.Plugin()' % plugable.__name__
assert read_only(p, 'api') is None
raises(AssertionError, p.finalize, None)
p.finalize(api)
assert read_only(p, 'api') is api
raises(AssertionError, p.finalize, api)
api = 'the api instance'
p = plugable.Plugin()
assert read_only(p, 'name') == 'Plugin'
assert repr(p) == '%s.Plugin()' % plugable.__name__
assert read_only(p, 'api') is None
raises(AssertionError, p.finalize, None)
p.finalize(api)
assert read_only(p, 'api') is api
raises(AssertionError, p.finalize, api)
class some_plugin(plugable.Plugin):
pass
p = some_plugin()
assert read_only(p, 'name') == 'some_plugin'
assert repr(p) == '%s.some_plugin()' % __name__
assert read_only(p, 'api') is None
raises(AssertionError, p.finalize, None)
p.finalize(api)
assert read_only(p, 'api') is api
raises(AssertionError, p.finalize, api)
class some_plugin(plugable.Plugin):
pass
p = some_plugin()
assert read_only(p, 'name') == 'some_plugin'
assert repr(p) == '%s.some_plugin()' % __name__
assert read_only(p, 'api') is None
raises(AssertionError, p.finalize, None)
p.finalize(api)
assert read_only(p, 'api') is api
raises(AssertionError, p.finalize, api)
def test_ReadOnly():
obj = plugable.ReadOnly()
names = ['not_an_attribute', 'an_attribute']
for name in names:
no_set(obj, name)
no_del(obj, name)
obj = plugable.ReadOnly()
names = ['not_an_attribute', 'an_attribute']
for name in names:
no_set(obj, name)
no_del(obj, name)
class some_ro_class(plugable.ReadOnly):
def __init__(self):
object.__setattr__(self, 'an_attribute', 'Hello world!')
obj = some_ro_class()
for name in names:
no_set(obj, name)
no_del(obj, name)
assert read_only(obj, 'an_attribute') == 'Hello world!'
class some_ro_class(plugable.ReadOnly):
def __init__(self):
object.__setattr__(self, 'an_attribute', 'Hello world!')
obj = some_ro_class()
for name in names:
no_set(obj, name)
no_del(obj, name)
assert read_only(obj, 'an_attribute') == 'Hello world!'
def test_Proxy():
cls = plugable.Proxy
assert issubclass(cls, plugable.ReadOnly)
cls = plugable.Proxy
assert issubclass(cls, plugable.ReadOnly)
# Setup:
class base(object):
__public__ = frozenset((
'public_0',
'public_1',
'__call__',
))
# Setup:
class base(object):
__public__ = frozenset((
'public_0',
'public_1',
'__call__',
))
def public_0(self):
return 'public_0'
def public_0(self):
return 'public_0'
def public_1(self):
return 'public_1'
def public_1(self):
return 'public_1'
def __call__(self, caller):
return 'ya called it, %s.' % caller
def __call__(self, caller):
return 'ya called it, %s.' % caller
def private_0(self):
return 'private_0'
def private_0(self):
return 'private_0'
def private_1(self):
return 'private_1'
def private_1(self):
return 'private_1'
class plugin(base):
name = 'user_add'
attr_name = 'add'
class plugin(base):
name = 'user_add'
attr_name = 'add'
# Test that TypeError is raised when base is not a class:
raises(TypeError, cls, base(), None)
# Test that TypeError is raised when base is not a class:
raises(TypeError, cls, base(), None)
# Test that ValueError is raised when target is not instance of base:
raises(ValueError, cls, base, object())
# Test that ValueError is raised when target is not instance of base:
raises(ValueError, cls, base, object())
# Test with correct arguments:
i = plugin()
p = cls(base, i)
assert read_only(p, 'name') == 'user_add'
assert list(p) == sorted(base.__public__)
# Test with correct arguments:
i = plugin()
p = cls(base, i)
assert read_only(p, 'name') == 'user_add'
assert list(p) == sorted(base.__public__)
# Test normal methods:
for n in xrange(2):
pub = 'public_%d' % n
priv = 'private_%d' % n
assert getattr(i, pub)() == pub
assert getattr(p, pub)() == pub
assert hasattr(p, pub)
assert getattr(i, priv)() == priv
assert not hasattr(p, priv)
# Test normal methods:
for n in xrange(2):
pub = 'public_%d' % n
priv = 'private_%d' % n
assert getattr(i, pub)() == pub
assert getattr(p, pub)() == pub
assert hasattr(p, pub)
assert getattr(i, priv)() == priv
assert not hasattr(p, priv)
# Test __call__:
value = 'ya called it, dude.'
assert i('dude') == value
assert p('dude') == value
assert callable(p)
# Test __call__:
value = 'ya called it, dude.'
assert i('dude') == value
assert p('dude') == value
assert callable(p)
# Test name_attr='name' kw arg
i = plugin()
p = cls(base, i, 'attr_name')
assert read_only(p, 'name') == 'add'
# Test name_attr='name' kw arg
i = plugin()
p = cls(base, i, 'attr_name')
assert read_only(p, 'name') == 'add'
# Test _clone():
i = plugin()
p = cls(base, i)
assert read_only(p, 'name') == 'user_add'
c = p._clone('attr_name')
assert isinstance(c, cls)
assert read_only(c, 'name') == 'add'
assert c is not p
assert c('whoever') == p('whoever')
# Test _clone():
i = plugin()
p = cls(base, i)
assert read_only(p, 'name') == 'user_add'
c = p._clone('attr_name')
assert isinstance(c, cls)
assert read_only(c, 'name') == 'add'
assert c is not p
assert c('whoever') == p('whoever')
def test_NameSpace():
cls = plugable.NameSpace
assert issubclass(cls, plugable.ReadOnly)
cls = plugable.NameSpace
assert issubclass(cls, plugable.ReadOnly)
class base(object):
__public__ = frozenset((
'plusplus',
))
class base(object):
__public__ = frozenset((
'plusplus',
))
def plusplus(self, n):
return n + 1
def plusplus(self, n):
return n + 1
class plugin(base):
def __init__(self, name):
self.name = name
class plugin(base):
def __init__(self, name):
self.name = name
def get_name(i):
return 'noun_verb%d' % i
def get_name(i):
return 'noun_verb%d' % i
def get_proxies(n):
for i in xrange(n):
yield plugable.Proxy(base, plugin(get_name(i)))
def get_proxies(n):
for i in xrange(n):
yield plugable.Proxy(base, plugin(get_name(i)))
cnt = 20
ns = cls(get_proxies(cnt))
cnt = 20
ns = cls(get_proxies(cnt))
# Test __len__
assert len(ns) == cnt
# Test __len__
assert len(ns) == cnt
# Test __iter__
i = None
for (i, proxy) in enumerate(ns):
assert type(proxy) is plugable.Proxy
assert proxy.name == get_name(i)
assert i == cnt - 1
# Test __iter__
i = None
for (i, proxy) in enumerate(ns):
assert type(proxy) is plugable.Proxy
assert proxy.name == get_name(i)
assert i == cnt - 1
# Test __contains__, __getitem__, getattr():
proxies = frozenset(ns)
for i in xrange(cnt):
name = get_name(i)
assert name in ns
proxy = ns[name]
assert proxy.name == name
assert type(proxy) is plugable.Proxy
assert proxy in proxies
assert read_only(ns, name) is proxy
# Test __contains__, __getitem__, getattr():
proxies = frozenset(ns)
for i in xrange(cnt):
name = get_name(i)
assert name in ns
proxy = ns[name]
assert proxy.name == name
assert type(proxy) is plugable.Proxy
assert proxy in proxies
assert read_only(ns, name) is proxy
# Test dir():
assert set(get_name(i) for i in xrange(cnt)).issubset(set(dir(ns)))
# Test dir():
assert set(get_name(i) for i in xrange(cnt)).issubset(set(dir(ns)))
# Test that KeyError, AttributeError is raised:
name = get_name(cnt)
assert name not in ns
raises(KeyError, getitem, ns, name)
raises(AttributeError, getattr, ns, name)
no_set(ns, name)
# Test that KeyError, AttributeError is raised:
name = get_name(cnt)
assert name not in ns
raises(KeyError, getitem, ns, name)
raises(AttributeError, getattr, ns, name)
no_set(ns, name)
def test_Registrar():
class Base1(object):
pass
class Base2(object):
pass
class Base3(object):
pass
class plugin1(Base1):
pass
class plugin2(Base2):
pass
class plugin3(Base3):
pass
class Base1(object):
pass
class Base2(object):
pass
class Base3(object):
pass
class plugin1(Base1):
pass
class plugin2(Base2):
pass
class plugin3(Base3):
pass
# Test creation of Registrar:
r = plugable.Registrar(Base1, Base2)
# Test creation of Registrar:
r = plugable.Registrar(Base1, Base2)
# Test __hasitem__, __getitem__:
for base in [Base1, Base2]:
assert base in r
assert base.__name__ in r
assert r[base] == {}
assert r[base.__name__] == {}
# Test __hasitem__, __getitem__:
for base in [Base1, Base2]:
assert base in r
assert base.__name__ in r
assert r[base] == {}
assert r[base.__name__] == {}
# Check that TypeError is raised trying to register something that isn't
# a class:
raises(TypeError, r, plugin1())
# Check that TypeError is raised trying to register something that isn't
# a class:
raises(TypeError, r, plugin1())
# Check that SubclassError is raised trying to register a class that is
# not a subclass of an allowed base:
raises(errors.SubclassError, r, plugin3)
# Check that SubclassError is raised trying to register a class that is
# not a subclass of an allowed base:
raises(errors.SubclassError, r, plugin3)
# Check that registration works
r(plugin1)
sub_d = r['Base1']
assert len(sub_d) == 1
assert sub_d['plugin1'] is plugin1
# Check that a copy is returned
assert sub_d is not r['Base1']
assert sub_d == r['Base1']
# Check that registration works
r(plugin1)
sub_d = r['Base1']
assert len(sub_d) == 1
assert sub_d['plugin1'] is plugin1
# Check that a copy is returned
assert sub_d is not r['Base1']
assert sub_d == r['Base1']
# Check that DuplicateError is raised trying to register exact class
# again:
raises(errors.DuplicateError, r, plugin1)
# Check that DuplicateError is raised trying to register exact class
# again:
raises(errors.DuplicateError, r, plugin1)
# Check that OverrideError is raised trying to register class with same
# name and same base:
orig1 = plugin1
class base1_extended(Base1):
pass
class plugin1(base1_extended):
pass
raises(errors.OverrideError, r, plugin1)
# Check that OverrideError is raised trying to register class with same
# name and same base:
orig1 = plugin1
class base1_extended(Base1):
pass
class plugin1(base1_extended):
pass
raises(errors.OverrideError, r, plugin1)
# Check that overriding works
r(plugin1, override=True)
sub_d = r['Base1']
assert len(sub_d) == 1
assert sub_d['plugin1'] is plugin1
assert sub_d['plugin1'] is not orig1
# Check that overriding works
r(plugin1, override=True)
sub_d = r['Base1']
assert len(sub_d) == 1
assert sub_d['plugin1'] is plugin1
assert sub_d['plugin1'] is not orig1
# Check that MissingOverrideError is raised trying to override a name
# not yet registerd:
raises(errors.MissingOverrideError, r, plugin2, override=True)
# Check that MissingOverrideError is raised trying to override a name
# not yet registerd:
raises(errors.MissingOverrideError, r, plugin2, override=True)
# Check that additional plugin can be registered:
r(plugin2)
sub_d = r['Base2']
assert len(sub_d) == 1
assert sub_d['plugin2'] is plugin2
# Check that additional plugin can be registered:
r(plugin2)
sub_d = r['Base2']
assert len(sub_d) == 1
assert sub_d['plugin2'] is plugin2
# Setup to test __iter__:
class plugin1a(Base1):
pass
r(plugin1a)
# Setup to test __iter__:
class plugin1a(Base1):
pass
r(plugin1a)
class plugin1b(Base1):
pass
r(plugin1b)
class plugin1b(Base1):
pass
r(plugin1b)
class plugin2a(Base2):
pass
r(plugin2a)
class plugin2a(Base2):
pass
r(plugin2a)
class plugin2b(Base2):
pass
r(plugin2b)
class plugin2b(Base2):
pass
r(plugin2b)
m = {
'Base1': set([plugin1, plugin1a, plugin1b]),
'Base2': set([plugin2, plugin2a, plugin2b]),
}
m = {
'Base1': set([plugin1, plugin1a, plugin1b]),
'Base2': set([plugin2, plugin2a, plugin2b]),
}
# Now test __iter__:
for (base, plugins) in r:
assert base in [Base1, Base2]
assert set(plugins) == m[base.__name__]
assert len(list(r)) == 2
# Now test __iter__:
for (base, plugins) in r:
assert base in [Base1, Base2]
assert set(plugins) == m[base.__name__]
assert len(list(r)) == 2
# Again test __hasitem__, __getitem__:
for base in [Base1, Base2]:
assert base in r
assert base.__name__ in r
d = dict((p.__name__, p) for p in m[base.__name__])
assert len(d) == 3
assert r[base] == d
assert r[base.__name__] == d
# Again test __hasitem__, __getitem__:
for base in [Base1, Base2]:
assert base in r
assert base.__name__ in r
d = dict((p.__name__, p) for p in m[base.__name__])
assert len(d) == 3
assert r[base] == d
assert r[base.__name__] == d
def test_API():
assert issubclass(plugable.API, plugable.ReadOnly)
assert issubclass(plugable.API, plugable.ReadOnly)
# Setup the test bases, create the API:
class base0(plugable.Plugin):
__public__ = frozenset((
'method',
))
# Setup the test bases, create the API:
class base0(plugable.Plugin):
__public__ = frozenset((
'method',
))
def method(self, n):
return n
def method(self, n):
return n
class base1(plugable.Plugin):
__public__ = frozenset((
'method',
))
class base1(plugable.Plugin):
__public__ = frozenset((
'method',
))
def method(self, n):
return n + 1
def method(self, n):
return n + 1
api = plugable.API(base0, base1)
r = api.register
assert isinstance(r, plugable.Registrar)
assert read_only(api, 'register') is r
api = plugable.API(base0, base1)
r = api.register
assert isinstance(r, plugable.Registrar)
assert read_only(api, 'register') is r
class base0_plugin0(base0):
pass
r(base0_plugin0)
class base0_plugin0(base0):
pass
r(base0_plugin0)
class base0_plugin1(base0):
pass
r(base0_plugin1)
class base0_plugin1(base0):
pass
r(base0_plugin1)
class base0_plugin2(base0):
pass
r(base0_plugin2)
class base0_plugin2(base0):
pass
r(base0_plugin2)
class base1_plugin0(base1):
pass
r(base1_plugin0)
class base1_plugin0(base1):
pass
r(base1_plugin0)
class base1_plugin1(base1):
pass
r(base1_plugin1)
class base1_plugin1(base1):
pass
r(base1_plugin1)
class base1_plugin2(base1):
pass
r(base1_plugin2)
class base1_plugin2(base1):
pass
r(base1_plugin2)
# Test API instance:
api() # Calling instance performs finalization
# Test API instance:
api() # Calling instance performs finalization
def get_base(b):
return 'base%d' % b
def get_base(b):
return 'base%d' % b
def get_plugin(b, p):
return 'base%d_plugin%d' % (b, p)
def get_plugin(b, p):
return 'base%d_plugin%d' % (b, p)
for b in xrange(2):
base_name = get_base(b)
ns = getattr(api, base_name)
assert isinstance(ns, plugable.NameSpace)
assert read_only(api, base_name) is ns
assert len(ns) == 3
for p in xrange(3):
plugin_name = get_plugin(b, p)
proxy = ns[plugin_name]
assert isinstance(proxy, plugable.Proxy)
assert proxy.name == plugin_name
assert read_only(ns, plugin_name) is proxy
assert read_only(proxy, 'method')(7) == 7 + b
for b in xrange(2):
base_name = get_base(b)
ns = getattr(api, base_name)
assert isinstance(ns, plugable.NameSpace)
assert read_only(api, base_name) is ns
assert len(ns) == 3
for p in xrange(3):
plugin_name = get_plugin(b, p)
proxy = ns[plugin_name]
assert isinstance(proxy, plugable.Proxy)
assert proxy.name == plugin_name
assert read_only(ns, plugin_name) is proxy
assert read_only(proxy, 'method')(7) == 7 + b

View File

@ -26,179 +26,179 @@ from ipalib import public, plugable, errors
def test_RULE_FLAG():
assert public.RULE_FLAG == 'validation_rule'
assert public.RULE_FLAG == 'validation_rule'
def test_rule():
flag = public.RULE_FLAG
rule = public.rule
def my_func():
pass
assert not hasattr(my_func, flag)
rule(my_func)
assert getattr(my_func, flag) is True
@rule
def my_func2():
pass
assert getattr(my_func2, flag) is True
flag = public.RULE_FLAG
rule = public.rule
def my_func():
pass
assert not hasattr(my_func, flag)
rule(my_func)
assert getattr(my_func, flag) is True
@rule
def my_func2():
pass
assert getattr(my_func2, flag) is True
def test_is_rule():
is_rule = public.is_rule
flag = public.RULE_FLAG
is_rule = public.is_rule
flag = public.RULE_FLAG
class no_call(object):
def __init__(self, value):
if value is not None:
assert value in (True, False)
setattr(self, flag, value)
class no_call(object):
def __init__(self, value):
if value is not None:
assert value in (True, False)
setattr(self, flag, value)
class call(no_call):
def __call__(self):
pass
class call(no_call):
def __call__(self):
pass
assert is_rule(call(True))
assert not is_rule(no_call(True))
assert not is_rule(call(False))
assert not is_rule(call(None))
assert is_rule(call(True))
assert not is_rule(no_call(True))
assert not is_rule(call(False))
assert not is_rule(call(None))
class test_option():
def cls(self):
return public.option
def cls(self):
return public.option
def sub(self):
rule = public.rule
class int_opt(self.cls()):
type = int
@rule
def rule_0(self, value):
if value == 0:
return 'cannot be 0'
@rule
def rule_1(self, value):
if value == 1:
return 'cannot be 1'
@rule
def rule_2(self, value):
if value == 2:
return 'cannot be 2'
return int_opt
def sub(self):
rule = public.rule
class int_opt(self.cls()):
type = int
@rule
def rule_0(self, value):
if value == 0:
return 'cannot be 0'
@rule
def rule_1(self, value):
if value == 1:
return 'cannot be 1'
@rule
def rule_2(self, value):
if value == 2:
return 'cannot be 2'
return int_opt
def test_class(self):
"""
Perform some tests on the class (not an instance).
"""
cls = self.cls()
#assert issubclass(cls, plugable.ReadOnly)
assert type(cls.rules) is property
def test_class(self):
"""
Perform some tests on the class (not an instance).
"""
cls = self.cls()
#assert issubclass(cls, plugable.ReadOnly)
assert type(cls.rules) is property
def test_normalize(self):
sub = self.sub()
i = sub()
# Test with values that can't be converted:
nope = (
'7.0'
'whatever',
object,
None,
)
for val in nope:
e = raises(errors.NormalizationError, i.normalize, val)
assert isinstance(e, errors.ValidationError)
assert e.name == 'int_opt'
assert e.value == val
assert e.error == "not <type 'int'>"
assert e.type is int
# Test with values that can be converted:
okay = (
7,
7.0,
7.2,
7L,
'7',
' 7 ',
)
for val in okay:
assert i.normalize(val) == 7
def test_normalize(self):
sub = self.sub()
i = sub()
# Test with values that can't be converted:
nope = (
'7.0'
'whatever',
object,
None,
)
for val in nope:
e = raises(errors.NormalizationError, i.normalize, val)
assert isinstance(e, errors.ValidationError)
assert e.name == 'int_opt'
assert e.value == val
assert e.error == "not <type 'int'>"
assert e.type is int
# Test with values that can be converted:
okay = (
7,
7.0,
7.2,
7L,
'7',
' 7 ',
)
for val in okay:
assert i.normalize(val) == 7
def test_rules(self):
"""
Test the rules property.
"""
o = self.sub()()
assert len(o.rules) == 3
def get_rule(i):
return getattr(o, 'rule_%d' % i)
rules = tuple(get_rule(i) for i in xrange(3))
assert o.rules == rules
def test_rules(self):
"""
Test the rules property.
"""
o = self.sub()()
assert len(o.rules) == 3
def get_rule(i):
return getattr(o, 'rule_%d' % i)
rules = tuple(get_rule(i) for i in xrange(3))
assert o.rules == rules
def test_validation(self):
"""
Test the validation method.
"""
o = self.sub()()
o.validate(9)
for i in xrange(3):
e = raises(errors.RuleError, o.validate, i)
assert e.error == 'cannot be %d' % i
assert e.value == i
def test_validation(self):
"""
Test the validation method.
"""
o = self.sub()()
o.validate(9)
for i in xrange(3):
e = raises(errors.RuleError, o.validate, i)
assert e.error == 'cannot be %d' % i
assert e.value == i
def test_cmd():
cls = public.cmd
assert issubclass(cls, plugable.Plugin)
cls = public.cmd
assert issubclass(cls, plugable.Plugin)
def test_obj():
cls = public.obj
assert issubclass(cls, plugable.Plugin)
cls = public.obj
assert issubclass(cls, plugable.Plugin)
def test_attr():
cls = public.attr
assert issubclass(cls, plugable.Plugin)
cls = public.attr
assert issubclass(cls, plugable.Plugin)
class api(object):
obj = dict(user='the user obj')
class api(object):
obj = dict(user='the user obj')
class user_add(cls):
pass
class user_add(cls):
pass
i = user_add()
assert read_only(i, 'obj_name') == 'user'
assert read_only(i, 'attr_name') == 'add'
assert read_only(i, 'obj') is None
i.finalize(api)
assert read_only(i, 'api') is api
assert read_only(i, 'obj') == 'the user obj'
i = user_add()
assert read_only(i, 'obj_name') == 'user'
assert read_only(i, 'attr_name') == 'add'
assert read_only(i, 'obj') is None
i.finalize(api)
assert read_only(i, 'api') is api
assert read_only(i, 'obj') == 'the user obj'
def test_mthd():
cls = public.mthd
assert issubclass(cls, public.attr)
assert issubclass(cls, public.cmd)
cls = public.mthd
assert issubclass(cls, public.attr)
assert issubclass(cls, public.cmd)
def test_prop():
cls = public.prop
assert issubclass(cls, public.attr)
cls = public.prop
assert issubclass(cls, public.attr)
def test_PublicAPI():
cls = public.PublicAPI
assert issubclass(cls, plugable.API)
cls = public.PublicAPI
assert issubclass(cls, plugable.API)
api = cls()
api = cls()
class cmd1(public.cmd):
pass
api.register(cmd1)
class cmd1(public.cmd):
pass
api.register(cmd1)
class cmd2(public.cmd):
pass
api.register(cmd2)
class cmd2(public.cmd):
pass
api.register(cmd2)
api()
api()

View File

@ -25,124 +25,124 @@ import tstutil
class Prop(object):
def __init__(self, *ops):
self.__ops = frozenset(ops)
self.__prop = 'prop value'
def __init__(self, *ops):
self.__ops = frozenset(ops)
self.__prop = 'prop value'
def __get_prop(self):
if 'get' not in self.__ops:
raise AttributeError('get prop')
return self.__prop
def __get_prop(self):
if 'get' not in self.__ops:
raise AttributeError('get prop')
return self.__prop
def __set_prop(self, value):
if 'set' not in self.__ops:
raise AttributeError('set prop')
self.__prop = value
def __set_prop(self, value):
if 'set' not in self.__ops:
raise AttributeError('set prop')
self.__prop = value
def __del_prop(self):
if 'del' not in self.__ops:
raise AttributeError('del prop')
self.__prop = None
def __del_prop(self):
if 'del' not in self.__ops:
raise AttributeError('del prop')
self.__prop = None
prop = property(__get_prop, __set_prop, __del_prop)
prop = property(__get_prop, __set_prop, __del_prop)
def test_yes_raised():
f = tstutil.raises
f = tstutil.raises
class SomeError(Exception):
pass
class SomeError(Exception):
pass
class AnotherError(Exception):
pass
class AnotherError(Exception):
pass
def callback1():
'raises correct exception'
raise SomeError()
def callback1():
'raises correct exception'
raise SomeError()
def callback2():
'raises wrong exception'
raise AnotherError()
def callback2():
'raises wrong exception'
raise AnotherError()
def callback3():
'raises no exception'
def callback3():
'raises no exception'
f(SomeError, callback1)
f(SomeError, callback1)
raised = False
try:
f(SomeError, callback2)
except AnotherError:
raised = True
assert raised
raised = False
try:
f(SomeError, callback2)
except AnotherError:
raised = True
assert raised
raised = False
try:
f(SomeError, callback3)
except tstutil.ExceptionNotRaised:
raised = True
assert raised
raised = False
try:
f(SomeError, callback3)
except tstutil.ExceptionNotRaised:
raised = True
assert raised
def test_no_set():
# Tests that it works when prop cannot be set:
tstutil.no_set(Prop('get', 'del'), 'prop')
# Tests that it works when prop cannot be set:
tstutil.no_set(Prop('get', 'del'), 'prop')
# Tests that ExceptionNotRaised is raised when prop *can* be set:
raised = False
try:
tstutil.no_set(Prop('set'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Tests that ExceptionNotRaised is raised when prop *can* be set:
raised = False
try:
tstutil.no_set(Prop('set'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
def test_no_del():
# Tests that it works when prop cannot be deleted:
tstutil.no_del(Prop('get', 'set'), 'prop')
# Tests that it works when prop cannot be deleted:
tstutil.no_del(Prop('get', 'set'), 'prop')
# Tests that ExceptionNotRaised is raised when prop *can* be set:
raised = False
try:
tstutil.no_del(Prop('del'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Tests that ExceptionNotRaised is raised when prop *can* be set:
raised = False
try:
tstutil.no_del(Prop('del'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
def test_read_only():
# Test that it works when prop is read only:
assert tstutil.read_only(Prop('get'), 'prop') == 'prop value'
# Test that it works when prop is read only:
assert tstutil.read_only(Prop('get'), 'prop') == 'prop value'
# Test that ExceptionNotRaised is raised when prop can be set:
raised = False
try:
tstutil.read_only(Prop('get', 'set'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Test that ExceptionNotRaised is raised when prop can be set:
raised = False
try:
tstutil.read_only(Prop('get', 'set'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Test that ExceptionNotRaised is raised when prop can be deleted:
raised = False
try:
tstutil.read_only(Prop('get', 'del'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Test that ExceptionNotRaised is raised when prop can be deleted:
raised = False
try:
tstutil.read_only(Prop('get', 'del'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Test that ExceptionNotRaised is raised when prop can be both set and
# deleted:
raised = False
try:
tstutil.read_only(Prop('get', 'del'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Test that ExceptionNotRaised is raised when prop can be both set and
# deleted:
raised = False
try:
tstutil.read_only(Prop('get', 'del'), 'prop')
except tstutil.ExceptionNotRaised:
raised = True
assert raised
# Test that AttributeError is raised when prop can't be read:
raised = False
try:
tstutil.read_only(Prop(), 'prop')
except AttributeError:
raised = True
assert raised
# Test that AttributeError is raised when prop can't be read:
raised = False
try:
tstutil.read_only(Prop(), 'prop')
except AttributeError:
raised = True
assert raised

View File

@ -22,78 +22,78 @@ Utility functions for the unit tests.
"""
class ExceptionNotRaised(Exception):
"""
Exception raised when an *expected* exception is *not* raised during a
unit test.
"""
msg = 'expected %s'
"""
Exception raised when an *expected* exception is *not* raised during a
unit test.
"""
msg = 'expected %s'
def __init__(self, expected):
self.expected = expected
def __init__(self, expected):
self.expected = expected
def __str__(self):
return self.msg % self.expected.__name__
def __str__(self):
return self.msg % self.expected.__name__
def raises(exception, callback, *args, **kw):
"""
Tests that the expected exception is raised; raises ExceptionNotRaised
if test fails.
"""
raised = False
try:
callback(*args, **kw)
except exception, e:
raised = True
if not raised:
raise ExceptionNotRaised(exception)
return e
"""
Tests that the expected exception is raised; raises ExceptionNotRaised
if test fails.
"""
raised = False
try:
callback(*args, **kw)
except exception, e:
raised = True
if not raised:
raise ExceptionNotRaised(exception)
return e
def getitem(obj, key):
"""
Works like getattr but for dictionary interface. Uses this in combination
with raises() to test that, for example, KeyError is raised.
"""
return obj[key]
"""
Works like getattr but for dictionary interface. Uses this in combination
with raises() to test that, for example, KeyError is raised.
"""
return obj[key]
def no_set(obj, name, value='some_new_obj'):
"""
Tests that attribute cannot be set.
"""
raises(AttributeError, setattr, obj, name, value)
"""
Tests that attribute cannot be set.
"""
raises(AttributeError, setattr, obj, name, value)
def no_del(obj, name):
"""
Tests that attribute cannot be deleted.
"""
raises(AttributeError, delattr, obj, name)
"""
Tests that attribute cannot be deleted.
"""
raises(AttributeError, delattr, obj, name)
def read_only(obj, name, value='some_new_obj'):
"""
Tests that attribute is read-only. Returns attribute.
"""
# Test that it cannot be set:
no_set(obj, name, value)
"""
Tests that attribute is read-only. Returns attribute.
"""
# Test that it cannot be set:
no_set(obj, name, value)
# Test that it cannot be deleted:
no_del(obj, name)
# Test that it cannot be deleted:
no_del(obj, name)
# Return the attribute
return getattr(obj, name)
# Return the attribute
return getattr(obj, name)
def is_prop(prop):
return type(prop) is property
return type(prop) is property
class ClassChecker(object):
def new(self, *args, **kw):
return self.cls(*args, **kw)
def new(self, *args, **kw):
return self.cls(*args, **kw)
def get_sub(self):
raise NotImplementedError('get_sub()')
def get_sub(self):
raise NotImplementedError('get_sub()')