mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-11 00:31:56 -06:00
feab723c59
As of Python3 `currentThread`, `thread.getName` are aliases for `threading.current_thread()` and `threading.Thread.name` respectively. In Python3.10: > bpo-43723: The following threading methods are now deprecated and should be replaced: currentThread => threading.current_thread() activeCount => threading.active_count() Condition.notifyAll => threading.Condition.notify_all() Event.isSet => threading.Event.is_set() Thread.setName => threading.Thread.name thread.getName => threading.Thread.name Thread.isDaemon => threading.Thread.daemon Thread.setDaemon => threading.Thread.daemon Fixes: https://pagure.io/freeipa/issue/9117 Signed-off-by: Stanislav Levin <slev@altlinux.org> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
287 lines
9.6 KiB
Python
287 lines
9.6 KiB
Python
# Authors:
|
|
# Jason Gerard DeRose <jderose@redhat.com>
|
|
#
|
|
# Copyright (C) 2008 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
Test the `ipalib.backend` module.
|
|
"""
|
|
from __future__ import print_function
|
|
|
|
import threading
|
|
from ipatests.util import ClassChecker, raises, create_test_api
|
|
from ipatests.data import unicode_str
|
|
from ipalib.request import context, Connection
|
|
from ipalib.frontend import Command
|
|
from ipalib import backend, plugable, errors
|
|
from ipapython.version import API_VERSION
|
|
|
|
import pytest
|
|
|
|
pytestmark = pytest.mark.tier0
|
|
|
|
class test_Backend(ClassChecker):
|
|
"""
|
|
Test the `ipalib.backend.Backend` class.
|
|
"""
|
|
|
|
_cls = backend.Backend
|
|
|
|
def test_class(self):
|
|
assert self.cls.__bases__ == (plugable.Plugin,)
|
|
|
|
|
|
class Disconnect:
|
|
called = False
|
|
|
|
def __init__(self, id=None):
|
|
self.id = id
|
|
|
|
def __call__(self):
|
|
assert self.called is False
|
|
self.called = True
|
|
if self.id is not None:
|
|
delattr(context, self.id)
|
|
|
|
|
|
class test_Connectible(ClassChecker):
|
|
"""
|
|
Test the `ipalib.backend.Connectible` class.
|
|
"""
|
|
|
|
_cls = backend.Connectible
|
|
|
|
def test_connect(self):
|
|
"""
|
|
Test the `ipalib.backend.Connectible.connect` method.
|
|
"""
|
|
# Test that connection is created:
|
|
api = 'the api instance'
|
|
class example(self.cls):
|
|
def create_connection(self, *args, **kw):
|
|
object.__setattr__(self, 'args', args)
|
|
object.__setattr__(self, 'kw', kw)
|
|
return 'The connection.'
|
|
o = example(api, shared_instance=True)
|
|
args = ('Arg1', 'Arg2', 'Arg3')
|
|
kw = dict(key1='Val1', key2='Val2', key3='Val3')
|
|
assert not hasattr(context, 'example')
|
|
assert o.connect(*args, **kw) is None
|
|
conn = context.example
|
|
assert type(conn) is Connection
|
|
# pylint: disable=no-member
|
|
assert o.args == args
|
|
assert o.kw == kw
|
|
# pylint: enable=no-member
|
|
assert conn.conn == 'The connection.'
|
|
assert conn.disconnect == o.disconnect
|
|
|
|
# Test that Exception is raised if already connected:
|
|
m = "{0} is already connected ({1} in {2})"
|
|
e = raises(Exception, o.connect, *args, **kw)
|
|
assert str(e) == m.format(
|
|
"example", o.id, threading.current_thread().name
|
|
)
|
|
|
|
# Double check that it works after deleting context.example:
|
|
del context.example
|
|
assert o.connect(*args, **kw) is None
|
|
|
|
def test_create_connection(self):
|
|
"""
|
|
Test the `ipalib.backend.Connectible.create_connection` method.
|
|
"""
|
|
api = 'the api instance'
|
|
class example(self.cls):
|
|
pass
|
|
for klass in (self.cls, example):
|
|
o = klass(api, shared_instance=True)
|
|
e = raises(NotImplementedError, o.create_connection)
|
|
assert str(e) == '%s.create_connection()' % klass.__name__
|
|
|
|
def test_disconnect(self):
|
|
"""
|
|
Test the `ipalib.backend.Connectible.disconnect` method.
|
|
"""
|
|
api = 'the api instance'
|
|
class example(self.cls):
|
|
destroy_connection = Disconnect()
|
|
o = example(api, shared_instance=True)
|
|
|
|
m = "{0} is not connected ({1} in {2})"
|
|
e = raises(Exception, o.disconnect)
|
|
assert str(e) == m.format(
|
|
"example", o.id, threading.current_thread().name
|
|
)
|
|
|
|
context.example = 'The connection.'
|
|
assert o.disconnect() is None
|
|
assert example.destroy_connection.called is True
|
|
|
|
def test_destroy_connection(self):
|
|
"""
|
|
Test the `ipalib.backend.Connectible.destroy_connection` method.
|
|
"""
|
|
api = 'the api instance'
|
|
class example(self.cls):
|
|
pass
|
|
for klass in (self.cls, example):
|
|
o = klass(api, shared_instance=True)
|
|
e = raises(NotImplementedError, o.destroy_connection)
|
|
assert str(e) == '%s.destroy_connection()' % klass.__name__
|
|
|
|
def test_isconnected(self):
|
|
"""
|
|
Test the `ipalib.backend.Connectible.isconnected` method.
|
|
"""
|
|
api = 'the api instance'
|
|
class example(self.cls):
|
|
pass
|
|
for klass in (self.cls, example):
|
|
o = klass(api, shared_instance=True)
|
|
assert o.isconnected() is False
|
|
conn = 'whatever'
|
|
setattr(context, klass.__name__, conn)
|
|
assert o.isconnected() is True
|
|
delattr(context, klass.__name__)
|
|
|
|
def test_conn(self):
|
|
"""
|
|
Test the `ipalib.backend.Connectible.conn` property.
|
|
"""
|
|
api = 'the api instance'
|
|
msg = '{0} is not connected ({1} in {2})'
|
|
class example(self.cls):
|
|
pass
|
|
for klass in (self.cls, example):
|
|
o = klass(api, shared_instance=True)
|
|
e = raises(AttributeError, getattr, o, 'conn')
|
|
assert str(e) == msg.format(
|
|
klass.__name__, o.id, threading.current_thread().name
|
|
)
|
|
conn = Connection('The connection.', Disconnect())
|
|
setattr(context, klass.__name__, conn)
|
|
assert o.conn is conn.conn
|
|
delattr(context, klass.__name__)
|
|
|
|
|
|
class test_Executioner(ClassChecker):
|
|
"""
|
|
Test the `ipalib.backend.Executioner` class.
|
|
"""
|
|
_cls = backend.Executioner
|
|
|
|
def test_execute(self):
|
|
"""
|
|
Test the `ipalib.backend.Executioner.execute` method.
|
|
"""
|
|
api, _home = create_test_api(in_server=True)
|
|
|
|
class echo(Command):
|
|
takes_args = ('arg1', 'arg2+')
|
|
takes_options = ('option1?', 'option2?')
|
|
def execute(self, *args, **options):
|
|
assert type(args[1]) is tuple
|
|
return dict(result=args + (options,))
|
|
api.add_plugin(echo)
|
|
|
|
class good(Command):
|
|
def execute(self, **options):
|
|
raise errors.ValidationError(
|
|
name='nurse',
|
|
error=u'Not naughty!',
|
|
)
|
|
api.add_plugin(good)
|
|
|
|
class bad(Command):
|
|
def execute(self, **options):
|
|
raise ValueError('This is private.')
|
|
api.add_plugin(bad)
|
|
|
|
class with_name(Command):
|
|
"""
|
|
Test that a kwarg named 'name' can be used.
|
|
"""
|
|
takes_options = 'name'
|
|
def execute(self, **options):
|
|
return dict(result=options['name'].upper())
|
|
api.add_plugin(with_name)
|
|
|
|
api.finalize()
|
|
o = self.cls(api)
|
|
o.finalize()
|
|
|
|
# Test that CommandError is raised:
|
|
conn = Connection('The connection.', Disconnect('someconn'))
|
|
context.someconn = conn
|
|
print(str(list(context.__dict__)))
|
|
e = raises(errors.CommandError, o.execute, 'nope')
|
|
assert e.name == 'nope'
|
|
assert conn.disconnect.called is True # Make sure destroy_context() was called
|
|
print(str(list(context.__dict__)))
|
|
assert list(context.__dict__) == []
|
|
|
|
# Test with echo command:
|
|
arg1 = unicode_str
|
|
arg2 = (u'Hello', unicode_str, u'world!')
|
|
args = (arg1,) + arg2
|
|
options = dict(option1=u'How are you?', option2=unicode_str,
|
|
version=API_VERSION)
|
|
|
|
conn = Connection('The connection.', Disconnect('someconn'))
|
|
context.someconn = conn
|
|
print(o.execute('echo', arg1, arg2, **options))
|
|
print(dict(
|
|
result=(arg1, arg2, options)
|
|
))
|
|
assert o.execute('echo', arg1, arg2, **options) == dict(
|
|
result=(arg1, arg2, options)
|
|
)
|
|
assert conn.disconnect.called is True # Make sure destroy_context() was called
|
|
assert list(context.__dict__) == []
|
|
|
|
conn = Connection('The connection.', Disconnect('someconn'))
|
|
context.someconn = conn
|
|
assert o.execute('echo', *args, **options) == dict(
|
|
result=(arg1, arg2, options)
|
|
)
|
|
assert conn.disconnect.called is True # Make sure destroy_context() was called
|
|
assert list(context.__dict__) == []
|
|
|
|
# Test with good command:
|
|
conn = Connection('The connection.', Disconnect('someconn'))
|
|
context.someconn = conn
|
|
e = raises(errors.ValidationError, o.execute, 'good')
|
|
assert e.name == 'nurse'
|
|
assert e.error == u'Not naughty!'
|
|
assert conn.disconnect.called is True # Make sure destroy_context() was called
|
|
assert list(context.__dict__) == []
|
|
|
|
# Test with bad command:
|
|
conn = Connection('The connection.', Disconnect('someconn'))
|
|
context.someconn = conn
|
|
e = raises(errors.InternalError, o.execute, 'bad')
|
|
assert conn.disconnect.called is True # Make sure destroy_context() was called
|
|
assert list(context.__dict__) == []
|
|
|
|
# Test with option 'name':
|
|
conn = Connection('The connection.', Disconnect('someconn'))
|
|
context.someconn = conn
|
|
expected = dict(result=u'TEST')
|
|
assert expected == o.execute('with_name', name=u'test',
|
|
version=API_VERSION)
|