plugable: Pass API to plugins on initialization rather than using set_api

https://fedorahosted.org/freeipa/ticket/3090

Reviewed-By: Martin Babinsky <mbabinsk@redhat.com>
This commit is contained in:
Jan Cholasta 2015-06-22 10:58:43 +00:00
parent 2d1515323a
commit e39fe4ed31
38 changed files with 209 additions and 278 deletions

View File

@ -58,7 +58,7 @@ OPERATION_NOT_SUPPORTED_BY_HELPER = 6
def ldap_connect():
conn = None
try:
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri)
conn = ldap2(api)
conn.connect(ccache=os.environ['KRB5CCNAME'])
yield conn
finally:

View File

@ -140,7 +140,7 @@ def _main():
conn = None
try:
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri)
conn = ldap2(api)
conn.connect(ccache=ccache_filename)
except Exception, e:
syslog.syslog(

View File

@ -107,7 +107,7 @@ def main():
conn = None
try:
try:
conn = ldap2(shared_instance=False, base_dn='')
conn = ldap2(api)
conn.connect(
bind_dn=DN(('cn', 'directory manager')), bind_pw=dirman_password
)

View File

@ -120,7 +120,7 @@ def main():
conn = None
try:
try:
conn = ldap2(shared_instance=False, base_dn='')
conn = ldap2(api)
conn.connect(
bind_dn=DN(('cn', 'directory manager')), bind_pw=dirman_password
)

View File

@ -292,7 +292,7 @@ does that, and the backend plugins are only installed on the server. The
``user_add.execute()`` method, which is only called when in a server context,
is implemented as a series of calls to methods on the ``ldap`` backend plugin.
When `plugable.Plugin.set_api()` is called, each plugin stores a reference to
When `plugable.Plugin.__init__()` is called, each plugin stores a reference to
the `plugable.API` instance it has been loaded into. So your plugin can
access the ``my_backend`` plugin as ``self.api.Backend.my_backend``.
@ -668,7 +668,7 @@ See the `ipalib.cli.textui` plugin for a description of its methods.
Logging from your plugin
------------------------
After `plugable.Plugin.set_api()` is called, your plugin will have a
After `plugable.Plugin.__init__()` is called, your plugin will have a
``self.log`` attribute. Plugins should only log through this attribute.
For example:

View File

@ -43,8 +43,8 @@ class Connectible(Backend):
`request.destroy_context()` can properly close all open connections.
"""
def __init__(self, shared_instance=False):
Backend.__init__(self)
def __init__(self, api, shared_instance=False):
Backend.__init__(self, api)
if shared_instance:
self.id = self.name
else:

View File

@ -715,7 +715,7 @@ class help(frontend.Local):
self._builtins = []
# build help topics
for c in self.Command():
for c in self.api.Command():
if c.NO_CLI:
continue

View File

@ -1250,12 +1250,12 @@ class Attribute(Plugin):
# Create stubs for attributes that are set in _on_finalize()
__obj = Plugin.finalize_attr('_Attribute__obj')
def __init__(self):
def __init__(self, api):
m = self.NAME_REGEX.match(type(self).__name__)
assert m
self.__obj_name = m.group('obj')
self.__attr_name = m.group('attr')
super(Attribute, self).__init__()
super(Attribute, self).__init__(api)
def __get_obj_name(self):
return self.__obj_name
@ -1347,9 +1347,6 @@ class Method(Attribute, Command):
extra_options_first = False
extra_args_first = False
def __init__(self):
super(Method, self).__init__()
def get_output_params(self):
for param in self.obj.params():
if 'no_output' in param.flags:

View File

@ -224,8 +224,9 @@ class Plugin(ReadOnly):
label = None
def __init__(self):
self.__api = None
def __init__(self, api):
assert api is not None
self.__api = api
self.__finalize_called = False
self.__finalized = False
self.__finalize_lock = threading.RLock()
@ -254,14 +255,27 @@ class Plugin(ReadOnly):
)
)
def __get_api(self):
@property
def api(self):
"""
Return `API` instance passed to `set_api()`.
If `set_api()` has not yet been called, None is returned.
Return `API` instance passed to `__init__()`.
"""
return self.__api
api = property(__get_api)
# FIXME: for backward compatibility only
@property
def env(self):
return self.__api.env
# FIXME: for backward compatibility only
@property
def Backend(self):
return self.__api.Backend
# FIXME: for backward compatibility only
@property
def Command(self):
return self.__api.Command
def finalize(self):
"""
@ -336,23 +350,6 @@ class Plugin(ReadOnly):
"attribute '%s' of plugin '%s' was not set in finalize()" % (self.name, obj.name)
)
def set_api(self, api):
"""
Set reference to `API` instance.
"""
assert self.__api is None, 'set_api() can only be called once'
assert api is not None, 'set_api() argument cannot be None'
self.__api = api
if not isinstance(api, API):
return
for name in api:
assert not hasattr(self, name)
setattr(self, name, api[name])
for name in ('env', 'context'):
if hasattr(api, name):
assert not hasattr(self, name)
setattr(self, name, getattr(api, name))
def call(self, executable, *args):
"""
Call ``executable`` with ``args`` using subprocess.call().
@ -746,7 +743,7 @@ class API(DictProxy):
try:
instance = plugins[klass]
except KeyError:
instance = plugins[klass] = klass()
instance = plugins[klass] = klass(self)
members.append(instance)
plugin_info.setdefault(
'%s.%s' % (klass.__module__, klass.__name__),
@ -760,9 +757,6 @@ class API(DictProxy):
self.__d[name] = namespace
object.__setattr__(self, name, namespace)
for instance in plugins.itervalues():
instance.set_api(self)
for klass, instance in plugins.iteritems():
if not production_mode:
assert instance.api is self

View File

@ -879,7 +879,7 @@ can use their Kerberos accounts.''')
return dict(result={}, failed={}, enabled=False, compat=True)
# connect to DS
ds_ldap = ldap2(shared_instance=False, ldap_uri=ldapuri, base_dn='')
ds_ldap = ldap2(self.api, ldap_uri=ldapuri)
cacert = None
if options.get('cacertfile') is not None:

View File

@ -133,11 +133,6 @@ class plugins(LocalOrRemote):
)
def execute(self, **options):
plugins = sorted(self.api.plugins, key=lambda o: o.plugin)
return dict(
result=dict(
(p.plugin, p.bases) for p in plugins
),
count=len(plugins),
result=dict(self.api.plugins),
)

View File

@ -945,9 +945,7 @@ class user_status(LDAPQuery):
if host == api.env.host:
other_ldap = self.obj.backend
else:
other_ldap = ldap2(shared_instance=False,
ldap_uri='ldap://%s' % host,
base_dn=self.api.env.basedn)
other_ldap = ldap2(self.api, ldap_uri='ldap://%s' % host)
try:
other_ldap.connect(ccache=os.environ['KRB5CCNAME'])
except Exception, e:

View File

@ -104,8 +104,8 @@ class Advice(Plugin):
require_root = False
description = ''
def __init__(self):
super(Advice, self).__init__()
def __init__(self, api):
super(Advice, self).__init__(api)
self.log = _AdviceOutput()
def set_options(self, options):

View File

@ -1179,7 +1179,7 @@ class BindInstance(service.Service):
print "Global DNS configuration in LDAP server is not empty"
print "The following configuration options override local settings in named.conf:"
print ""
textui = ipalib.cli.textui()
textui = ipalib.cli.textui(api)
api.Command.dnsconfig_show.output_for_cli(textui, result, None, reverse=False)
def uninstall(self):

View File

@ -1588,7 +1588,7 @@ def update_people_entry(dercert):
while attempts < 10:
conn = None
try:
conn = ldap2.ldap2(shared_instance=False, ldap_uri=dogtag_uri)
conn = ldap2.ldap2(api, ldap_uri=dogtag_uri)
conn.connect(autobind=True)
db_filter = conn.make_filter(
@ -1643,7 +1643,7 @@ def ensure_ldap_profiles_container():
server_id = installutils.realm_to_serverid(api.env.realm)
dogtag_uri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % server_id
conn = ldap2.ldap2(shared_instance=False, ldap_uri=dogtag_uri)
conn = ldap2.ldap2(api, ldap_uri=dogtag_uri)
if not conn.isconnected():
conn.connect(autobind=True)
@ -1675,7 +1675,7 @@ def configure_profiles_acl():
)
modlist = [(ldap.MOD_ADD, 'resourceACLS', [rule])]
conn = ldap2.ldap2(shared_instance=False, ldap_uri=dogtag_uri)
conn = ldap2.ldap2(api, ldap_uri=dogtag_uri)
if not conn.isconnected():
conn.connect(autobind=True)
rules = conn.get_entry(dn).get('resourceACLS', [])
@ -1696,7 +1696,7 @@ def import_included_profiles():
server_id = installutils.realm_to_serverid(api.env.realm)
dogtag_uri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % server_id
conn = ldap2.ldap2(shared_instance=False, ldap_uri=dogtag_uri)
conn = ldap2.ldap2(api, ldap_uri=dogtag_uri)
if not conn.isconnected():
conn.connect(autobind=True)

View File

@ -121,7 +121,7 @@ class CACertManage(admintool.AdminTool):
return rc
def ldap_connect(self):
conn = ldap2()
conn = ldap2(api)
password = self.options.password
if not password:

View File

@ -507,7 +507,7 @@ class OTPTokenImport(admintool.AdminTool):
api.bootstrap(in_server=True)
api.finalize()
conn = ldap2()
conn = ldap2(api)
try:
ccache = krbV.default_context().default_ccache()
conn.connect(ccache=ccache)

View File

@ -637,7 +637,7 @@ class ReplicaPrepare(admintool.AdminTool):
os.remove(agent_name)
def update_pki_admin_password(self):
ldap = ldap2(shared_instance=False)
ldap = ldap2(api)
ldap.connect(
bind_dn=DN(('cn', 'directory manager')),
bind_pw=self.dirman_password

View File

@ -240,8 +240,7 @@ def set_subject_in_config(realm_name, dm_password, suffix, subject_base):
installutils.realm_to_serverid(realm_name)
)
try:
conn = ldap2(shared_instance=False, ldap_uri=ldapuri,
base_dn=suffix)
conn = ldap2(api, ldap_uri=ldapuri)
conn.connect(bind_dn=DN(('cn', 'directory manager')),
bind_pw=dm_password)
except errors.ExecutionError, e:

View File

@ -1286,7 +1286,7 @@ class ra(rabase.rabase):
"""
DEFAULT_PROFILE = dogtag.DEFAULT_PROFILE
def __init__(self):
def __init__(self, api):
if api.env.in_tree:
self.sec_dir = api.env.dot_ipa + os.sep + 'alias'
self.pwd_file = self.sec_dir + os.sep + '.pwd'
@ -1303,7 +1303,7 @@ class ra(rabase.rabase):
f.close()
except IOError:
self.password = ''
super(ra, self).__init__()
super(ra, self).__init__(api)
def raise_certificate_operation_error(self, func_name, err_msg=None, detail=None):
"""
@ -1896,11 +1896,11 @@ class kra(Backend):
KRA backend plugin (for Vault)
"""
def __init__(self, kra_port=443):
def __init__(self, api, kra_port=443):
self.kra_port = kra_port
super(kra, self).__init__()
super(kra, self).__init__(api)
def get_client(self):
"""
@ -1958,7 +1958,7 @@ class RestClient(Backend):
except:
return None
def __init__(self):
def __init__(self, api):
if api.env.in_tree:
self.sec_dir = api.env.dot_ipa + os.sep + 'alias'
self.pwd_file = self.sec_dir + os.sep + '.pwd'
@ -1970,7 +1970,7 @@ class RestClient(Backend):
self.ipa_certificate_nickname = "ipaCert"
self.ca_certificate_nickname = "caCert"
self._read_password()
super(RestClient, self).__init__()
super(RestClient, self).__init__(api)
# session cookie
self.override_port = None

View File

@ -56,47 +56,20 @@ from ipalib.crud import CrudBackend
from ipalib.request import context
class ldap2(LDAPClient, CrudBackend):
class ldap2(CrudBackend, LDAPClient):
"""
LDAP Backend Take 2.
"""
def __init__(self, shared_instance=False, ldap_uri=None, base_dn=None,
schema=None):
self.__ldap_uri = None
def __init__(self, api, ldap_uri=None):
if ldap_uri is None:
ldap_uri = api.env.ldap_uri
CrudBackend.__init__(self, shared_instance=shared_instance)
LDAPClient.__init__(self, ldap_uri)
force_schema_updates = api.env.context in ('installer', 'updates')
self.__base_dn = base_dn
@property
def api(self):
self_api = super(ldap2, self).api
if self_api is None:
self_api = api
return self_api
@property
def ldap_uri(self):
try:
return self.__ldap_uri or self.api.env.ldap_uri
except AttributeError:
return 'ldap://example.com'
@ldap_uri.setter
def ldap_uri(self, value):
self.__ldap_uri = value
@property
def base_dn(self):
try:
if self.__base_dn is not None:
return DN(self.__base_dn)
else:
return DN(self.api.env.basedn)
except AttributeError:
return DN()
CrudBackend.__init__(self, api)
LDAPClient.__init__(self, ldap_uri,
force_schema_updates=force_schema_updates)
def _connect(self):
# Connectible.conn is a proxy to thread-local storage;
@ -145,8 +118,6 @@ class ldap2(LDAPClient, CrudBackend):
if debug_level:
_ldap.set_option(_ldap.OPT_DEBUG_LEVEL, debug_level)
object.__setattr__(self, '_force_schema_updates',
self.api.env.context in ('installer', 'updates'))
LDAPClient._connect(self)
conn = self._conn

View File

@ -41,14 +41,14 @@ class rabase(Backend):
"""
Request Authority backend plugin.
"""
def __init__(self):
def __init__(self, api):
if api.env.in_tree:
self.sec_dir = api.env.dot_ipa + os.sep + 'alias'
self.pwd_file = self.sec_dir + os.sep + '.pwd'
else:
self.sec_dir = paths.HTTPD_ALIAS_DIR
self.pwd_file = paths.ALIAS_PWDFILE_TXT
super(rabase, self).__init__()
super(rabase, self).__init__(api)
def check_request_status(self, request_id):

View File

@ -238,8 +238,8 @@ class wsgi_dispatch(Executioner, HTTP_Status):
handler which is specific to the authentication and RPC mechanism.
"""
def __init__(self):
super(wsgi_dispatch, self).__init__()
def __init__(self, api):
super(wsgi_dispatch, self).__init__(api)
self.__apps = {}
def __iter__(self):
@ -301,14 +301,11 @@ class WSGIExecutioner(Executioner):
_system_commands = {}
def set_api(self, api):
super(WSGIExecutioner, self).set_api(api)
if 'wsgi_dispatch' in self.api.Backend:
self.api.Backend.wsgi_dispatch.mount(self, self.key)
def _on_finalize(self):
self.url = self.env.mount_ipa + self.key
super(WSGIExecutioner, self)._on_finalize()
if 'wsgi_dispatch' in self.api.Backend:
self.api.Backend.wsgi_dispatch.mount(self, self.key)
def wsgi_execute(self, environ):
result = None
@ -746,8 +743,8 @@ class jsonserver_session(jsonserver, KerberosSession):
key = '/session/json'
def __init__(self):
super(jsonserver_session, self).__init__()
def __init__(self, api):
super(jsonserver_session, self).__init__(api)
name = '{0}_{1}'.format(self.__class__.__name__, id(self))
auth_mgr = AuthManagerKerb(name)
session_mgr.auth_mgr.register(auth_mgr.name, auth_mgr)
@ -849,9 +846,6 @@ class jsonserver_kerb(jsonserver, KerberosWSGIExecutioner):
class login_kerberos(Backend, KerberosSession, HTTP_Status):
key = '/session/login_kerberos'
def __init__(self):
super(login_kerberos, self).__init__()
def _on_finalize(self):
super(login_kerberos, self)._on_finalize()
self.api.Backend.wsgi_dispatch.mount(self, self.key)
@ -873,9 +867,6 @@ class login_password(Backend, KerberosSession, HTTP_Status):
content_type = 'text/plain'
key = '/session/login_password'
def __init__(self):
super(login_password, self).__init__()
def _on_finalize(self):
super(login_password, self)._on_finalize()
self.api.Backend.wsgi_dispatch.mount(self, self.key)
@ -998,9 +989,6 @@ class change_password(Backend, HTTP_Status):
content_type = 'text/plain'
key = '/session/change_password'
def __init__(self):
super(change_password, self).__init__()
def _on_finalize(self):
super(change_password, self)._on_finalize()
self.api.Backend.wsgi_dispatch.mount(self, self.key)
@ -1051,8 +1039,7 @@ class change_password(Backend, HTTP_Status):
pw = data['old_password']
if data.get('otp'):
pw = data['old_password'] + data['otp']
conn = ldap2(shared_instance=False,
ldap_uri=self.api.env.ldap_uri)
conn = ldap2(self.api)
conn.connect(bind_dn=bind_dn, bind_pw=pw)
except (NotFound, ACIError):
result = 'invalid-password'
@ -1104,9 +1091,6 @@ class sync_token(Backend, HTTP_Status):
namedtype.OptionalNamedType('tokenDN', univ.OctetString())
)
def __init__(self):
super(sync_token, self).__init__()
def _on_finalize(self):
super(sync_token, self)._on_finalize()
self.api.Backend.wsgi_dispatch.mount(self, self.key)
@ -1165,7 +1149,7 @@ class sync_token(Backend, HTTP_Status):
title = 'Token sync rejected'
# Perform the synchronization.
conn = ldap2(shared_instance=False, ldap_uri=self.api.env.ldap_uri)
conn = ldap2(self.api)
try:
conn.connect(bind_dn=bind_dn,
bind_pw=data['password'],
@ -1199,8 +1183,8 @@ class xmlserver_session(xmlserver, KerberosSession):
key = '/session/xml'
def __init__(self):
super(xmlserver_session, self).__init__()
def __init__(self, api):
super(xmlserver_session, self).__init__(api)
name = '{0}_{1}'.format(self.__class__.__name__, id(self))
auth_mgr = AuthManagerKerb(name)
session_mgr.auth_mgr.register(auth_mgr.name, auth_mgr)

View File

@ -36,7 +36,7 @@ from ipaserver.plugins.ldap2 import ldap2
ccache = krbV.default_context().default_ccache()
try:
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
conn = ldap2(api)
conn.connect(ccache=ccache)
conn.disconnect()
server_available = True

View File

@ -45,7 +45,7 @@ def use_keytab(principal, keytab):
ccache = krbV.CCache(name=ccache_file, context=krbcontext, primary_principal=principal)
ccache.init(principal)
ccache.init_creds_keytab(keytab=keytab, principal=principal)
conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
conn = ldap2(api)
conn.connect(ccache=ccache)
conn.disconnect()
except krbV.Krb5Error, e:

View File

@ -71,12 +71,13 @@ class test_Connectible(ClassChecker):
Test the `ipalib.backend.Connectible.connect` method.
"""
# Test that connection is created:
api = 'the api instance'
class example(self.cls):
def create_connection(self, *args, **kw):
object.__setattr__(self, 'args', args)
object.__setattr__(self, 'kw', kw)
return 'The connection.'
o = example(shared_instance=True)
o = example(api, shared_instance=True)
args = ('Arg1', 'Arg2', 'Arg3')
kw = dict(key1='Val1', key2='Val2', key3='Val3')
assert not hasattr(context, 'example')
@ -101,10 +102,11 @@ class test_Connectible(ClassChecker):
"""
Test the `ipalib.backend.Connectible.create_connection` method.
"""
api = 'the api instance'
class example(self.cls):
pass
for klass in (self.cls, example):
o = klass(shared_instance=True)
o = klass(api, shared_instance=True)
e = raises(NotImplementedError, o.create_connection)
assert str(e) == '%s.create_connection()' % klass.__name__
@ -112,9 +114,10 @@ class test_Connectible(ClassChecker):
"""
Test the `ipalib.backend.Connectible.disconnect` method.
"""
api = 'the api instance'
class example(self.cls):
destroy_connection = Disconnect()
o = example(shared_instance=True)
o = example(api, shared_instance=True)
m = "disconnect: 'context.%s' does not exist in thread %r"
e = raises(StandardError, o.disconnect)
@ -128,10 +131,11 @@ class test_Connectible(ClassChecker):
"""
Test the `ipalib.backend.Connectible.destroy_connection` method.
"""
api = 'the api instance'
class example(self.cls):
pass
for klass in (self.cls, example):
o = klass(shared_instance=True)
o = klass(api, shared_instance=True)
e = raises(NotImplementedError, o.destroy_connection)
assert str(e) == '%s.destroy_connection()' % klass.__name__
@ -139,10 +143,11 @@ class test_Connectible(ClassChecker):
"""
Test the `ipalib.backend.Connectible.isconnected` method.
"""
api = 'the api instance'
class example(self.cls):
pass
for klass in (self.cls, example):
o = klass(shared_instance=True)
o = klass(api, shared_instance=True)
assert o.isconnected() is False
conn = 'whatever'
setattr(context, klass.__name__, conn)
@ -153,11 +158,12 @@ class test_Connectible(ClassChecker):
"""
Test the `ipalib.backend.Connectible.conn` property.
"""
api = 'the api instance'
msg = 'no context.%s in thread %r'
class example(self.cls):
pass
for klass in (self.cls, example):
o = klass(shared_instance=True)
o = klass(api, shared_instance=True)
e = raises(AttributeError, getattr, o, 'conn')
assert str(e) == msg % (
klass.__name__, threading.currentThread().getName()
@ -211,8 +217,7 @@ class test_Executioner(ClassChecker):
api.add_plugin(with_name)
api.finalize()
o = self.cls()
o.set_api(api)
o = self.cls(api)
o.finalize()
# Test that CommandError is raised:

View File

@ -32,7 +32,8 @@ class test_textui(ClassChecker):
"""
Test the `ipalib.cli.textui.max_col_width` method.
"""
o = self.cls()
api = 'the api instance'
o = self.cls(api)
e = raises(TypeError, o.max_col_width, 'hello')
assert str(e) == 'rows: need %r or %r; got %r' % (list, tuple, 'hello')
rows = [

View File

@ -202,10 +202,11 @@ class test_CrudBackend(ClassChecker):
return ldap
def check_method(self, name, *args):
o = self.cls()
api = 'the api instance'
o = self.cls(api)
e = raises(NotImplementedError, getattr(o, name), *args)
assert str(e) == 'CrudBackend.%s()' % name
sub = self.subcls()
sub = self.subcls(api)
e = raises(NotImplementedError, getattr(sub, name), *args)
assert str(e) == 'ldap.%s()' % name

View File

@ -89,31 +89,32 @@ class test_HasParam(ClassChecker):
"""
Test the `ipalib.frontend.HasParam._get_param_iterable` method.
"""
api = 'the api instance'
class WithTuple(self.cls):
takes_stuff = ('one', 'two')
o = WithTuple()
o = WithTuple(api)
assert o._get_param_iterable('stuff') is WithTuple.takes_stuff
junk = ('three', 'four')
class WithCallable(self.cls):
def takes_stuff(self):
return junk
o = WithCallable()
o = WithCallable(api)
assert o._get_param_iterable('stuff') is junk
class WithParam(self.cls):
takes_stuff = parameters.Str('five')
o = WithParam()
o = WithParam(api)
assert o._get_param_iterable('stuff') == (WithParam.takes_stuff,)
class WithStr(self.cls):
takes_stuff = 'six'
o = WithStr()
o = WithStr(api)
assert o._get_param_iterable('stuff') == ('six',)
class Wrong(self.cls):
takes_stuff = ['seven', 'eight']
o = Wrong()
o = Wrong(api)
e = raises(TypeError, o._get_param_iterable, 'stuff')
assert str(e) == '%s.%s must be a tuple, callable, or spec; got %r' % (
'Wrong', 'takes_stuff', Wrong.takes_stuff
@ -123,6 +124,7 @@ class test_HasParam(ClassChecker):
"""
Test the `ipalib.frontend.HasParam._filter_param_by_context` method.
"""
api = 'the api instance'
class Example(self.cls):
def get_stuff(self):
return (
@ -132,7 +134,7 @@ class test_HasParam(ClassChecker):
parameters.Str('four', exclude='server'),
parameters.Str('five', exclude=['whatever', 'cli']),
)
o = Example()
o = Example(api)
# Test when env is None:
params = list(o._filter_param_by_context('stuff'))
@ -161,7 +163,7 @@ class test_HasParam(ClassChecker):
# Test with no get_stuff:
class Missing(self.cls):
pass
o = Missing()
o = Missing(api)
gen = o._filter_param_by_context('stuff')
e = raises(NotImplementedError, list, gen)
assert str(e) == 'Missing.get_stuff()'
@ -169,7 +171,7 @@ class test_HasParam(ClassChecker):
# Test when get_stuff is not callable:
class NotCallable(self.cls):
get_stuff = ('one', 'two')
o = NotCallable()
o = NotCallable(api)
gen = o._filter_param_by_context('stuff')
e = raises(TypeError, list, gen)
assert str(e) == '%s.%s must be a callable; got %r' % (
@ -219,10 +221,11 @@ class test_Command(ClassChecker):
"""
Helper method used to test args and options.
"""
api = 'the api instance'
class example(self.cls):
takes_args = args
takes_options = options
o = example()
o = example(api)
o.finalize()
return o
@ -237,7 +240,8 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.get_args` method.
"""
assert list(self.cls().get_args()) == []
api = 'the api instance'
assert list(self.cls(api).get_args()) == []
args = ('login', 'stuff')
o = self.get_instance(args=args)
assert tuple(o.get_args()) == args
@ -246,7 +250,8 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.get_options` method.
"""
options = list(self.cls().get_options())
api = 'the api instance'
options = list(self.cls(api).get_options())
assert len(options) == 1
assert options[0].name == 'version'
options = ('verbose', 'debug')
@ -259,8 +264,8 @@ class test_Command(ClassChecker):
"""
Test the ``ipalib.frontend.Command.args`` instance attribute.
"""
assert self.cls().args is None
o = self.cls()
api = 'the api instance'
o = self.cls(api)
o.finalize()
assert type(o.args) is plugable.NameSpace
assert len(o.args) == 0
@ -308,8 +313,8 @@ class test_Command(ClassChecker):
"""
Test the ``ipalib.frontend.Command.options`` instance attribute.
"""
assert self.cls().options is None
o = self.cls()
api = 'the api instance'
o = self.cls(api)
o.finalize()
assert type(o.options) is plugable.NameSpace
assert len(o.options) == 1
@ -329,8 +334,8 @@ class test_Command(ClassChecker):
"""
Test the ``ipalib.frontend.Command.output`` instance attribute.
"""
inst = self.cls()
assert inst.output is None
api = 'the api instance'
inst = self.cls(api)
inst.finalize()
assert type(inst.output) is plugable.NameSpace
assert list(inst.output) == ['result']
@ -340,9 +345,10 @@ class test_Command(ClassChecker):
"""
Test the ``ipalib.frontend.Command._iter_output`` instance attribute.
"""
api = 'the api instance'
class Example(self.cls):
pass
inst = Example()
inst = Example(api)
inst.has_output = tuple()
assert list(inst._iter_output()) == []
@ -373,6 +379,8 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.soft_validate` method.
"""
class api(object):
env = config.Env(context='cli')
class user_add(frontend.Command):
takes_args = parameters.Str('uid',
normalizer=lambda value: value.lower(),
@ -381,8 +389,7 @@ class test_Command(ClassChecker):
takes_options = ('givenname', 'sn')
cmd = user_add()
cmd.env = config.Env(context='cli')
cmd = user_add(api)
cmd.finalize()
assert list(cmd.params) == ['givenname', 'sn', 'uid', 'version']
ret = cmd.soft_validate({})
@ -398,11 +405,12 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.convert` method.
"""
api = 'the api instance'
kw = dict(
option0=u'1.5',
option1=u'7',
)
o = self.subcls()
o = self.subcls(api)
o.finalize()
for (key, value) in o.convert(**kw).iteritems():
assert_equal(unicode(kw[key]), value)
@ -411,12 +419,13 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.normalize` method.
"""
api = 'the api instance'
kw = dict(
option0=u'OPTION0',
option1=u'OPTION1',
)
norm = dict((k, v.lower()) for (k, v) in kw.items())
sub = self.subcls()
sub = self.subcls(api)
sub.finalize()
assert sub.normalize(**kw) == norm
@ -444,8 +453,7 @@ class test_Command(ClassChecker):
(api, home) = create_test_api()
api.finalize()
o = my_cmd()
o.set_api(api)
o = my_cmd(api)
o.finalize()
e = o(**kw) # pylint: disable=not-callable
assert type(e) is dict
@ -457,9 +465,10 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.validate` method.
"""
class api(object):
env = config.Env(context='cli')
sub = self.subcls()
sub.env = config.Env(context='cli')
sub = self.subcls(api)
sub.finalize()
# Check with valid values
@ -491,7 +500,8 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.execute` method.
"""
o = self.cls()
api = 'the api instance'
o = self.cls(api)
e = raises(NotImplementedError, o.execute)
assert str(e) == 'Command.execute()'
@ -568,8 +578,7 @@ class test_Command(ClassChecker):
(api, home) = create_test_api()
api.finalize()
o = my_cmd()
o.set_api(api)
o = my_cmd(api)
o.finalize()
e = o.run(*args, **kw)
assert type(e) is dict
@ -608,8 +617,7 @@ class test_Command(ClassChecker):
# Test in server context:
(api, home) = create_test_api(in_server=True)
api.finalize()
o = my_cmd()
o.set_api(api)
o = my_cmd(api)
assert o.run.im_func is self.cls.run.im_func
out = o.run(*args, **kw)
assert ('execute', args, kw) == out
@ -617,8 +625,7 @@ class test_Command(ClassChecker):
# Test in non-server context
(api, home) = create_test_api(in_server=False)
api.finalize()
o = my_cmd()
o.set_api(api)
o = my_cmd(api)
assert o.run.im_func is self.cls.run.im_func
assert ('forward', args, kw) == o.run(*args, **kw)
@ -650,16 +657,14 @@ class test_Command(ClassChecker):
# Test in server context:
(api, home) = create_test_api(in_server=True)
api.finalize()
o = my_cmd()
o.set_api(api)
o = my_cmd(api)
assert o.run.im_func is self.cls.run.im_func
assert {'name': 'execute', 'messages': expected} == o.run(*args, **kw)
# Test in non-server context
(api, home) = create_test_api(in_server=False)
api.finalize()
o = my_cmd()
o.set_api(api)
o = my_cmd(api)
assert o.run.im_func is self.cls.run.im_func
assert {'name': 'forward', 'messages': expected} == o.run(*args, **kw)
@ -667,10 +672,11 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.validate_output` method.
"""
api = 'the api instance'
class Example(self.cls):
has_output = ('foo', 'bar', 'baz')
inst = Example()
inst = Example(api)
inst.finalize()
# Test with wrong type:
@ -705,13 +711,14 @@ class test_Command(ClassChecker):
"""
Test `ipalib.frontend.Command.validate_output` per-type validation.
"""
api = 'the api instance'
class Complex(self.cls):
has_output = (
output.Output('foo', int),
output.Output('bar', list),
)
inst = Complex()
inst = Complex(api)
inst.finalize()
wrong = dict(foo=17.9, bar=[18])
@ -730,6 +737,7 @@ class test_Command(ClassChecker):
"""
Test `ipalib.frontend.Command.validate_output` nested validation.
"""
api = 'the api instance'
class Subclass(output.ListOfEntries):
pass
@ -740,7 +748,7 @@ class test_Command(ClassChecker):
output.Output('hello', int),
Subclass('world'),
)
inst = nested()
inst = nested(api)
inst.finalize()
okay = dict(foo='bar')
nope = ('aye', 'bee')
@ -761,6 +769,7 @@ class test_Command(ClassChecker):
"""
Test the `ipalib.frontend.Command.get_output_params` method.
"""
api = 'the api instance'
class example(self.cls):
has_output_params = (
'one',
@ -775,8 +784,7 @@ class test_Command(ClassChecker):
'baz',
)
inst = example()
assert list(inst.get_output_params()) == ['one', 'two', 'three']
inst = example(api)
inst.finalize()
assert list(inst.get_output_params()) == [
'one', 'two', 'three', inst.params.foo, inst.params.baz
@ -794,7 +802,8 @@ class test_LocalOrRemote(ClassChecker):
"""
Test the `ipalib.frontend.LocalOrRemote.__init__` method.
"""
o = self.cls()
api = 'the api instance'
o = self.cls(api)
o.finalize()
assert list(o.args) == []
assert list(o.options) == ['server', 'version']
@ -872,16 +881,6 @@ class test_Object(ClassChecker):
"""
Test the `ipalib.frontend.Object.__init__` method.
"""
o = self.cls()
assert o.backend is None
assert o.methods is None
assert o.params is None
assert o.params_minus_pk is None
def test_set_api(self):
"""
Test the `ipalib.frontend.Object.set_api` method.
"""
# Setup for test:
class DummyAttribute(object):
def __init__(self, obj_name, attr_name, name=None):
@ -908,20 +907,22 @@ class test_Object(ClassChecker):
cnt = 10
methods_format = 'method_%d'
_d = dict(
Method=plugable.NameSpace(
class FakeAPI(object):
Method = plugable.NameSpace(
get_attributes(cnt, methods_format)
),
)
api = plugable.MagicDict(_d)
)
def __contains__(self, key):
return hasattr(self, key)
def __getitem__(self, key):
return getattr(self, key)
api = FakeAPI()
assert len(api.Method) == cnt * 3
class user(self.cls):
pass
# Actually perform test:
o = user()
o.set_api(api)
o = user(api)
assert read_only(o, 'api') is api
namespace = o.methods
@ -938,15 +939,13 @@ class test_Object(ClassChecker):
assert attr.name == '%s_%s' % ('user', attr_name)
# Test params instance attribute
o = self.cls()
o.set_api(api)
o = self.cls(api)
ns = o.params
assert type(ns) is plugable.NameSpace
assert len(ns) == 0
class example(self.cls):
takes_params = ('banana', 'apple')
o = example()
o.set_api(api)
o = example(api)
ns = o.params
assert type(ns) is plugable.NameSpace
assert len(ns) == 2, repr(ns)
@ -969,8 +968,7 @@ class test_Object(ClassChecker):
'one',
'two',
)
o = example1()
o.set_api(api)
o = example1(api)
assert o.primary_key is None
# Test with 1 primary key:
@ -981,8 +979,7 @@ class test_Object(ClassChecker):
parameters.Str('three', primary_key=True),
'four',
)
o = example2()
o.set_api(api)
o = example2(api)
pk = o.primary_key
assert type(pk) is parameters.Str
assert pk.name == 'three'
@ -999,8 +996,7 @@ class test_Object(ClassChecker):
'three',
parameters.Str('four', primary_key=True),
)
o = example3()
o.set_api(api)
o = example3(api)
e = raises(ValueError, o.finalize)
assert str(e) == \
'example3 (Object) has multiple primary keys: one, two, four'
@ -1025,12 +1021,13 @@ class test_Object(ClassChecker):
"""
Test the `ipalib.frontend.Object.get_dn` method.
"""
o = self.cls()
api = 'the api instance'
o = self.cls(api)
e = raises(NotImplementedError, o.get_dn, 'primary key')
assert str(e) == 'Object.get_dn()'
class user(self.cls):
pass
o = user()
o = user(api)
e = raises(NotImplementedError, o.get_dn, 'primary key')
assert str(e) == 'user.get_dn()'
@ -1040,9 +1037,9 @@ class test_Object(ClassChecker):
"""
class example(self.cls):
takes_params = ('one', 'two', 'three', 'four')
o = example()
(api, home) = create_test_api()
o.set_api(api)
api.finalize()
o = example(api)
p = o.params
assert tuple(o.params_minus()) == tuple(p())
assert tuple(o.params_minus([])) == tuple(p())
@ -1073,28 +1070,16 @@ class test_Attribute(ClassChecker):
"""
Test the `ipalib.frontend.Attribute.__init__` method.
"""
class user_add(self.cls):
pass
o = user_add()
assert read_only(o, 'obj') is None
assert read_only(o, 'obj_name') == 'user'
assert read_only(o, 'attr_name') == 'add'
def test_set_api(self):
"""
Test the `ipalib.frontend.Attribute.set_api` method.
"""
user_obj = 'The user frontend.Object instance'
class api(object):
Object = dict(user=user_obj)
class user_add(self.cls):
pass
o = user_add()
assert read_only(o, 'api') is None
assert read_only(o, 'obj') is None
o.set_api(api)
o = user_add(api)
assert read_only(o, 'api') is api
assert read_only(o, 'obj') is user_obj
assert read_only(o, 'obj_name') == 'user'
assert read_only(o, 'attr_name') == 'add'
class test_Method(ClassChecker):
@ -1133,9 +1118,10 @@ class test_Method(ClassChecker):
"""
Test the `ipalib.frontend.Method.__init__` method.
"""
api = 'the api instance'
class user_add(self.cls):
pass
o = user_add()
o = user_add(api)
assert o.name == 'user_add'
assert o.obj_name == 'user'
assert o.attr_name == 'add'

View File

@ -71,9 +71,10 @@ class test_ListOfEntries(ClassChecker):
"""
Test the `ipalib.output.ListOfEntries.validate` method.
"""
api = 'the api instance'
class example(Command):
pass
cmd = example()
cmd = example(api)
inst = self.cls('stuff')
okay = dict(foo='bar')

View File

@ -219,7 +219,8 @@ class test_Plugin(ClassChecker):
"""
Test the `ipalib.plugable.Plugin.__init__` method.
"""
o = self.cls()
api = 'the api instance'
o = self.cls(api)
assert o.name == 'Plugin'
assert o.module == 'ipalib.plugable'
assert o.fullname == 'ipalib.plugable.Plugin'
@ -234,7 +235,7 @@ class test_Plugin(ClassChecker):
One more paragraph.
"""
o = some_subclass()
o = some_subclass(api)
assert o.name == 'some_subclass'
assert o.module == __name__
assert o.fullname == '%s.some_subclass' % __name__
@ -242,36 +243,23 @@ class test_Plugin(ClassChecker):
assert isinstance(o.doc, text.Gettext)
class another_subclass(self.cls):
pass
o = another_subclass()
o = another_subclass(api)
assert o.summary == '<%s>' % o.fullname
# Test that Plugin makes sure the subclass hasn't defined attributes
# whose names conflict with the logger methods set in Plugin.__init__():
class check(self.cls):
info = 'whatever'
e = raises(StandardError, check)
e = raises(StandardError, check, api)
assert str(e) == \
"info is already bound to ipatests.test_ipalib.test_plugable.check()"
def test_set_api(self):
"""
Test the `ipalib.plugable.Plugin.set_api` method.
"""
api = 'the api instance'
o = self.cls()
assert o.api is None
e = raises(AssertionError, o.set_api, None)
assert str(e) == 'set_api() argument cannot be None'
o.set_api(api)
assert o.api is api
e = raises(AssertionError, o.set_api, api)
assert str(e) == 'set_api() can only be called once'
def test_finalize(self):
"""
Test the `ipalib.plugable.Plugin.finalize` method.
"""
o = self.cls()
api = 'the api instance'
o = self.cls(api)
assert not o.__islocked__()
o.finalize()
assert o.__islocked__()

View File

@ -60,7 +60,7 @@ class test_ldap(object):
"""
Test an anonymous LDAP bind using ldap2
"""
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
self.conn = ldap2(api, ldap_uri=self.ldapuri)
self.conn.connect()
dn = api.env.basedn
entry_attrs = self.conn.get_entry(dn, ['associateddomain'])
@ -73,7 +73,7 @@ class test_ldap(object):
"""
if not ipautil.file_exists(self.ccache):
raise nose.SkipTest('Missing ccache %s' % self.ccache)
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
self.conn = ldap2(api, ldap_uri=self.ldapuri)
self.conn.connect(ccache='FILE:%s' % self.ccache)
entry_attrs = self.conn.get_entry(self.dn, ['usercertificate'])
cert = entry_attrs.get('usercertificate')
@ -92,7 +92,7 @@ class test_ldap(object):
fp.close()
else:
raise nose.SkipTest("No directory manager password in %s" % pwfile)
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
self.conn = ldap2(api, ldap_uri=self.ldapuri)
self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password)
entry_attrs = self.conn.get_entry(self.dn, ['usercertificate'])
cert = entry_attrs.get('usercertificate')
@ -137,7 +137,7 @@ class test_ldap(object):
Test an autobind LDAP bind using ldap2
"""
ldapuri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % api.env.realm.replace('.','-')
self.conn = ldap2(shared_instance=False, ldap_uri=ldapuri)
self.conn = ldap2(api, ldap_uri=ldapuri)
try:
self.conn.connect(autobind=True)
except errors.ACIError:
@ -160,7 +160,7 @@ class test_LDAPEntry(object):
def setup(self):
self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host)
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
self.conn = ldap2(api, ldap_uri=self.ldapuri)
self.conn.connect()
self.entry = self.conn.make_entry(self.dn1, cn=self.cn1)

View File

@ -47,7 +47,8 @@ class StartResponse(object):
def test_not_found():
f = rpcserver.HTTP_Status()
api = 'the api instance'
f = rpcserver.HTTP_Status(api)
t = rpcserver._not_found_template
s = StartResponse()
@ -72,7 +73,8 @@ def test_not_found():
def test_bad_request():
f = rpcserver.HTTP_Status()
api = 'the api instance'
f = rpcserver.HTTP_Status(api)
t = rpcserver._bad_request_template
s = StartResponse()
@ -85,7 +87,8 @@ def test_bad_request():
def test_internal_error():
f = rpcserver.HTTP_Status()
api = 'the api instance'
f = rpcserver.HTTP_Status(api)
t = rpcserver._internal_error_template
s = StartResponse()
@ -98,7 +101,8 @@ def test_internal_error():
def test_unauthorized_error():
f = rpcserver.HTTP_Status()
api = 'the api instance'
f = rpcserver.HTTP_Status(api)
t = rpcserver._unauthorized_template
s = StartResponse()
@ -139,7 +143,8 @@ class test_session(object):
[environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
)
inst = self.klass()
api = 'the api instance'
inst = self.klass(api)
inst.mount(app1, '/foo/stuff')
inst.mount(app2, '/bar')
@ -157,7 +162,8 @@ class test_session(object):
pass
# Test that mount works:
inst = self.klass()
api = 'the api instance'
inst = self.klass(api)
inst.mount(app1, 'foo')
assert inst['foo'] is app1
assert list(inst) == ['foo']

View File

@ -44,7 +44,8 @@ def test_exc_wrapper():
assert kwargs == dict(a=1, b=2)
raise errors.ExecutionError('failure')
instance = test_callback()
api = 'the api instance'
instance = test_callback(api)
# Test with one callback first
@ -96,8 +97,10 @@ def test_callback_registration():
callbacktest_subclass.register_callback('test', subclass_callback)
api = 'the api instance'
messages = []
instance = callbacktest_base()
instance = callbacktest_base(api)
for callback in instance.get_callbacks('test'):
callback(instance, 42)
assert messages == [
@ -106,7 +109,7 @@ def test_callback_registration():
('Registered callback from another class', 42)]
messages = []
instance = callbacktest_subclass()
instance = callbacktest_subclass(api)
for callback in instance.get_callbacks('test'):
callback(instance, 42)
assert messages == [
@ -134,7 +137,9 @@ def test_exc_callback_registration():
"""Raise an error"""
raise errors.ExecutionError('failure')
base_instance = callbacktest_base()
api = 'the api instance'
base_instance = callbacktest_base(api)
class callbacktest_subclass(callbacktest_base):
pass
@ -145,7 +150,7 @@ def test_exc_callback_registration():
messages.append('Subclass registered callback')
raise exc
subclass_instance = callbacktest_subclass()
subclass_instance = callbacktest_subclass(api)
# Make sure exception in base class is only handled by the base class
base_instance.test_fail()

View File

@ -400,7 +400,7 @@ def _get_nameservers_ldap(conn):
def get_nameservers():
ldap = ldap2(shared_instance=False)
ldap = ldap2(api)
ldap.connect(ccache=krbV.default_context().default_ccache())
nameservers = [normalize_zone(x) for x in _get_nameservers_ldap(ldap)]
return nameservers

View File

@ -1297,7 +1297,7 @@ class test_netgroup(Declarative):
# """
# # Do an LDAP query to the compat area and verify that the entry
# # is correct
# conn = ldap2(shared_instance=False, ldap_uri=api.env.ldap_uri, base_dn=api.env.basedn)
# conn = ldap2(api)
# conn.connect(ccache=ccache)
# try:
# entries = conn.find_entries('cn=%s' % self.ng_cn,

View File

@ -3174,7 +3174,7 @@ class test_managed_permissions(Declarative):
def add_managed_permission(self):
"""Add a managed permission and the corresponding ACI"""
ldap = ldap2(shared_instance=False)
ldap = ldap2(api)
ldap.connect(ccache=krbV.default_context().default_ccache())
result = api.Command.permission_add(permission1, type=u'user',

View File

@ -74,7 +74,7 @@ def makecert(reqdir, subject, principal):
Generate a certificate that can be used during unit testing.
"""
ra = rabase.rabase()
ra = rabase.rabase(api)
if (not os.path.exists(ra.sec_dir) and
api.env.xmlrpc_uri == 'http://localhost:8888/ipa/xml'):
raise AssertionError('The self-signed CA is not configured, '