freeipa/tests/test_ipalib/test_plugable.py
2010-01-28 13:32:00 -05:00

518 lines
15 KiB
Python

# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Test the `ipalib.plugable` module.
"""
import inspect
from tests.util import raises, no_set, no_del, read_only
from tests.util import getitem, setitem, delitem
from tests.util import ClassChecker, create_test_api
from ipalib import plugable, errors
class test_SetProxy(ClassChecker):
"""
Test the `ipalib.plugable.SetProxy` class.
"""
_cls = plugable.SetProxy
def test_class(self):
"""
Test the `ipalib.plugable.SetProxy` class.
"""
assert self.cls.__bases__ == (plugable.ReadOnly,)
def test_init(self):
"""
Test the `ipalib.plugable.SetProxy.__init__` method.
"""
okay = (set, frozenset, dict)
fail = (list, tuple)
for t in okay:
self.cls(t())
raises(TypeError, self.cls, t)
for t in fail:
raises(TypeError, self.cls, t())
raises(TypeError, self.cls, t)
def test_SetProxy(self):
"""
Test container emulation of `ipalib.plugable.SetProxy` class.
"""
def get_key(i):
return 'key_%d' % i
cnt = 10
target = set()
proxy = self.cls(target)
for i in xrange(cnt):
key = get_key(i)
# Check initial state
assert len(proxy) == len(target)
assert list(proxy) == sorted(target)
assert key not in proxy
assert key not in target
# Add and test again
target.add(key)
assert len(proxy) == len(target)
assert list(proxy) == sorted(target)
assert key in proxy
assert key in target
class test_DictProxy(ClassChecker):
"""
Test the `ipalib.plugable.DictProxy` class.
"""
_cls = plugable.DictProxy
def test_class(self):
"""
Test the `ipalib.plugable.DictProxy` class.
"""
assert self.cls.__bases__ == (plugable.SetProxy,)
def test_init(self):
"""
Test the `ipalib.plugable.DictProxy.__init__` method.
"""
self.cls(dict())
raises(TypeError, self.cls, dict)
fail = (set, frozenset, list, tuple)
for t in fail:
raises(TypeError, self.cls, t())
raises(TypeError, self.cls, t)
def test_DictProxy(self):
"""
Test container emulation of `ipalib.plugable.DictProxy` class.
"""
def get_kv(i):
return (
'key_%d' % i,
'val_%d' % i,
)
cnt = 10
target = dict()
proxy = self.cls(target)
for i in xrange(cnt):
(key, val) = get_kv(i)
# Check initial state
assert len(proxy) == len(target)
assert list(proxy) == sorted(target)
assert list(proxy()) == [target[k] for k in sorted(target)]
assert key not in proxy
raises(KeyError, getitem, proxy, key)
# Add and test again
target[key] = val
assert len(proxy) == len(target)
assert list(proxy) == sorted(target)
assert list(proxy()) == [target[k] for k in sorted(target)]
# Verify TypeError is raised trying to set/del via proxy
raises(TypeError, setitem, proxy, key, val)
raises(TypeError, delitem, proxy, key)
class test_MagicDict(ClassChecker):
"""
Test the `ipalib.plugable.MagicDict` class.
"""
_cls = plugable.MagicDict
def test_class(self):
"""
Test the `ipalib.plugable.MagicDict` class.
"""
assert self.cls.__bases__ == (plugable.DictProxy,)
for non_dict in ('hello', 69, object):
raises(TypeError, self.cls, non_dict)
def test_MagicDict(self):
"""
Test container emulation of `ipalib.plugable.MagicDict` class.
"""
cnt = 10
keys = []
d = dict()
dictproxy = self.cls(d)
for i in xrange(cnt):
key = 'key_%d' % i
val = 'val_%d' % i
keys.append(key)
# Test thet key does not yet exist
assert len(dictproxy) == i
assert key not in dictproxy
assert not hasattr(dictproxy, key)
raises(KeyError, getitem, dictproxy, key)
raises(AttributeError, getattr, dictproxy, key)
# Test that items/attributes cannot be set on dictproxy:
raises(TypeError, setitem, dictproxy, key, val)
raises(AttributeError, setattr, dictproxy, key, val)
# Test that additions in d are reflected in dictproxy:
d[key] = val
assert len(dictproxy) == i + 1
assert key in dictproxy
assert hasattr(dictproxy, key)
assert dictproxy[key] is val
assert read_only(dictproxy, key) is val
# Test __iter__
assert list(dictproxy) == keys
for key in keys:
# Test that items cannot be deleted through dictproxy:
raises(TypeError, delitem, dictproxy, key)
raises(AttributeError, delattr, dictproxy, key)
# Test that deletions in d are reflected in dictproxy
del d[key]
assert len(dictproxy) == len(d)
assert key not in dictproxy
raises(KeyError, getitem, dictproxy, key)
raises(AttributeError, getattr, dictproxy, key)
class test_Plugin(ClassChecker):
"""
Test the `ipalib.plugable.Plugin` class.
"""
_cls = plugable.Plugin
def test_class(self):
"""
Test the `ipalib.plugable.Plugin` class.
"""
assert self.cls.__bases__ == (plugable.ReadOnly,)
assert type(self.cls.api) is property
def test_init(self):
"""
Test the `ipalib.plugable.Plugin.__init__` method.
"""
o = self.cls()
assert o.name == 'Plugin'
assert o.module == 'ipalib.plugable'
assert o.fullname == 'ipalib.plugable.Plugin'
assert o.doc == inspect.getdoc(self.cls)
class some_subclass(self.cls):
"""
Do sub-classy things.
Although it doesn't know how to comport itself and is not for mixed
company, this class *is* useful as we all need a little sub-class
now and then.
One more paragraph.
"""
o = some_subclass()
assert o.name == 'some_subclass'
assert o.module == __name__
assert o.fullname == '%s.some_subclass' % __name__
assert o.doc == inspect.getdoc(some_subclass)
assert o.summary == 'Do sub-classy things.'
class another_subclass(self.cls):
pass
o = another_subclass()
assert o.doc is None
assert o.summary == '<%s>' % o.fullname
# Test that Plugin makes sure the subclass hasn't defined attributes
# whose names conflict with the logger methods set in Plugin.__init__():
class check(self.cls):
info = 'whatever'
e = raises(StandardError, check)
assert str(e) == \
"check.info attribute ('whatever') conflicts with Plugin logger"
def test_set_api(self):
"""
Test the `ipalib.plugable.Plugin.set_api` method.
"""
api = 'the api instance'
o = self.cls()
assert o.api is None
e = raises(AssertionError, o.set_api, None)
assert str(e) == 'set_api() argument cannot be None'
o.set_api(api)
assert o.api is api
e = raises(AssertionError, o.set_api, api)
assert str(e) == 'set_api() can only be called once'
def test_finalize(self):
"""
Test the `ipalib.plugable.Plugin.finalize` method.
"""
o = self.cls()
assert not o.__islocked__()
o.finalize()
assert o.__islocked__()
def test_call(self):
"""
Test the `ipalib.plugable.Plugin.call` method.
"""
o = self.cls()
o.call('/bin/true') is None
e = raises(errors.SubprocessError, o.call, '/bin/false')
assert e.returncode == 1
assert e.argv == ('/bin/false',)
def test_Registrar():
"""
Test the `ipalib.plugable.Registrar` class
"""
class Base1(object):
pass
class Base2(object):
pass
class Base3(object):
pass
class plugin1(Base1):
pass
class plugin2(Base2):
pass
class plugin3(Base3):
pass
# Test creation of Registrar:
r = plugable.Registrar(Base1, Base2)
# Test __iter__:
assert list(r) == ['Base1', 'Base2']
# Test __hasitem__, __getitem__:
for base in [Base1, Base2]:
name = base.__name__
assert name in r
assert r[name] is base
magic = getattr(r, name)
assert type(magic) is plugable.MagicDict
assert len(magic) == 0
# Check that TypeError is raised trying to register something that isn't
# a class:
p = plugin1()
e = raises(TypeError, r, p)
assert str(e) == 'plugin must be a class; got %r' % p
# Check that SubclassError is raised trying to register a class that is
# not a subclass of an allowed base:
e = raises(errors.PluginSubclassError, r, plugin3)
assert e.plugin is plugin3
# Check that registration works
r(plugin1)
assert len(r.Base1) == 1
assert r.Base1['plugin1'] is plugin1
assert r.Base1.plugin1 is plugin1
# Check that DuplicateError is raised trying to register exact class
# again:
e = raises(errors.PluginDuplicateError, r, plugin1)
assert e.plugin is plugin1
# Check that OverrideError is raised trying to register class with same
# name and same base:
orig1 = plugin1
class base1_extended(Base1):
pass
class plugin1(base1_extended):
pass
e = raises(errors.PluginOverrideError, r, plugin1)
assert e.base == 'Base1'
assert e.name == 'plugin1'
assert e.plugin is plugin1
# Check that overriding works
r(plugin1, override=True)
assert len(r.Base1) == 1
assert r.Base1.plugin1 is plugin1
assert r.Base1.plugin1 is not orig1
# Check that MissingOverrideError is raised trying to override a name
# not yet registerd:
e = raises(errors.PluginMissingOverrideError, r, plugin2, override=True)
assert e.base == 'Base2'
assert e.name == 'plugin2'
assert e.plugin is plugin2
# Test that another plugin can be registered:
assert len(r.Base2) == 0
r(plugin2)
assert len(r.Base2) == 1
assert r.Base2.plugin2 is plugin2
# Setup to test more registration:
class plugin1a(Base1):
pass
r(plugin1a)
class plugin1b(Base1):
pass
r(plugin1b)
class plugin2a(Base2):
pass
r(plugin2a)
class plugin2b(Base2):
pass
r(plugin2b)
# Again test __hasitem__, __getitem__:
for base in [Base1, Base2]:
name = base.__name__
assert name in r
assert r[name] is base
magic = getattr(r, name)
assert len(magic) == 3
for key in magic:
klass = magic[key]
assert getattr(magic, key) is klass
assert issubclass(klass, base)
class test_API(ClassChecker):
"""
Test the `ipalib.plugable.API` class.
"""
_cls = plugable.API
def test_API(self):
"""
Test the `ipalib.plugable.API` class.
"""
assert issubclass(plugable.API, plugable.ReadOnly)
# Setup the test bases, create the API:
class base0(plugable.Plugin):
def method(self, n):
return n
class base1(plugable.Plugin):
def method(self, n):
return n + 1
api = plugable.API(base0, base1)
api.env.mode = 'unit_test'
api.env.in_tree = True
r = api.register
assert isinstance(r, plugable.Registrar)
assert read_only(api, 'register') is r
class base0_plugin0(base0):
pass
r(base0_plugin0)
class base0_plugin1(base0):
pass
r(base0_plugin1)
class base0_plugin2(base0):
pass
r(base0_plugin2)
class base1_plugin0(base1):
pass
r(base1_plugin0)
class base1_plugin1(base1):
pass
r(base1_plugin1)
class base1_plugin2(base1):
pass
r(base1_plugin2)
# Test API instance:
assert api.isdone('bootstrap') is False
assert api.isdone('finalize') is False
api.finalize()
assert api.isdone('bootstrap') is True
assert api.isdone('finalize') is True
def get_base_name(b):
return 'base%d' % b
def get_plugin_name(b, p):
return 'base%d_plugin%d' % (b, p)
for b in xrange(2):
base_name = get_base_name(b)
base = locals()[base_name]
ns = getattr(api, base_name)
assert isinstance(ns, plugable.NameSpace)
assert read_only(api, base_name) is ns
assert len(ns) == 3
for p in xrange(3):
plugin_name = get_plugin_name(b, p)
plugin = locals()[plugin_name]
inst = ns[plugin_name]
assert isinstance(inst, base)
assert isinstance(inst, plugin)
assert inst.name == plugin_name
assert read_only(ns, plugin_name) is inst
assert inst.method(7) == 7 + b
# Test that calling finilize again raises AssertionError:
e = raises(StandardError, api.finalize)
assert str(e) == 'API.finalize() already called', str(e)
def test_bootstrap(self):
"""
Test the `ipalib.plugable.API.bootstrap` method.
"""
(o, home) = create_test_api()
assert o.env._isdone('_bootstrap') is False
assert o.env._isdone('_finalize_core') is False
assert o.isdone('bootstrap') is False
o.bootstrap(my_test_override='Hello, world!')
assert o.isdone('bootstrap') is True
assert o.env._isdone('_bootstrap') is True
assert o.env._isdone('_finalize_core') is True
assert o.env.my_test_override == 'Hello, world!'
e = raises(StandardError, o.bootstrap)
assert str(e) == 'API.bootstrap() already called'
def test_load_plugins(self):
"""
Test the `ipalib.plugable.API.load_plugins` method.
"""
(o, home) = create_test_api()
assert o.isdone('bootstrap') is False
assert o.isdone('load_plugins') is False
o.load_plugins()
assert o.isdone('bootstrap') is True
assert o.isdone('load_plugins') is True
e = raises(StandardError, o.load_plugins)
assert str(e) == 'API.load_plugins() already called'