# Authors: # Rob Crittenden # # Copyright (C) 2008 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Base class for all XML-RPC tests """ import datetime import nose from ipatests.util import assert_deepequal, Fuzzy from ipalib import api, request, errors from ipalib.x509 import valid_issuer from ipapython.version import API_VERSION # Matches a gidnumber like '1391016742' # FIXME: Does it make more sense to return gidnumber, uidnumber, etc. as `int` # or `long`? If not, we still need to return them as `unicode` instead of `str`. fuzzy_digits = Fuzzy('^\d+$', type=basestring) uuid_re = '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' # Matches an ipauniqueid like u'784d85fd-eae7-11de-9d01-54520012478b' fuzzy_uuid = Fuzzy('^%s$' % uuid_re) # Matches an automember task DN fuzzy_automember_dn = Fuzzy( '^cn=%s,cn=automember rebuild membership,cn=tasks,cn=config$' % uuid_re ) # Matches an automember task finish message fuzzy_automember_message = Fuzzy( '^Automember rebuild task finished\. Processed \(\d+\) entries\.$' ) # Matches trusted domain GUID, like u'463bf2be-3456-4a57-979e-120304f2a0eb' fuzzy_guid = fuzzy_uuid # Matches SID of a trusted domain # SID syntax: http://msdn.microsoft.com/en-us/library/ff632068.aspx _sid_identifier_authority = '(0x[0-9a-f]{1,12}|[0-9]{1,10})' fuzzy_domain_sid = Fuzzy( '^S-1-5-21-%(idauth)s-%(idauth)s-%(idauth)s$' % dict(idauth=_sid_identifier_authority) ) fuzzy_user_or_group_sid = Fuzzy( '^S-1-5-21-%(idauth)s-%(idauth)s-%(idauth)s-%(idauth)s$' % dict(idauth=_sid_identifier_authority) ) # Matches netgroup dn. Note (?i) at the beginning of the regexp is the ingnore case flag fuzzy_netgroupdn = Fuzzy( '(?i)ipauniqueid=%s,cn=ng,cn=alt,%s' % (uuid_re, api.env.basedn) ) # Matches sudocmd dn fuzzy_sudocmddn = Fuzzy( '(?i)ipauniqueid=%s,cn=sudocmds,cn=sudo,%s' % (uuid_re, api.env.basedn) ) # Matches a hash signature, not enforcing length fuzzy_hash = Fuzzy('^([a-f0-9][a-f0-9]:)+[a-f0-9][a-f0-9]$', type=basestring) # Matches a date, like Tue Apr 26 17:45:35 2016 UTC fuzzy_date = Fuzzy('^[a-zA-Z]{3} [a-zA-Z]{3} \d{2} \d{2}:\d{2}:\d{2} \d{4} UTC$') fuzzy_issuer = Fuzzy(type=basestring, test=lambda issuer: valid_issuer(issuer)) fuzzy_hex = Fuzzy('^0x[0-9a-fA-F]+$', type=basestring) # Matches password - password consists of all printable characters without whitespaces # The only exception is space, but space cannot be at the beggingin or end of the pwd fuzzy_password = Fuzzy('^\S([\S ]*\S)*$') # Matches generalized time value. Time format is: %Y%m%d%H%M%SZ fuzzy_dergeneralizedtime = Fuzzy(type=datetime.datetime) # match any string fuzzy_string = Fuzzy(type=basestring) # case insensitive match of sets def fuzzy_set_ci(s): return Fuzzy(test=lambda other: set(x.lower() for x in other) == set(y.lower() for y in s)) try: if not api.Backend.rpcclient.isconnected(): api.Backend.rpcclient.connect(fallback=False) res = api.Command['user_show'](u'notfound') except errors.NetworkError: server_available = False except IOError: server_available = False except errors.NotFound: server_available = True adtrust_is_enabled = api.Command['adtrust_is_enabled']()['result'] sidgen_was_run = api.Command['sidgen_was_run']()['result'] def add_sid(d, check_sidgen=False): if adtrust_is_enabled and (not check_sidgen or sidgen_was_run): d['ipantsecurityidentifier'] = (fuzzy_user_or_group_sid,) return d def add_oc(l, oc, check_sidgen=False): if adtrust_is_enabled and (not check_sidgen or sidgen_was_run): return l + [oc] return l def assert_attr_equal(entry, key, value): if type(entry) is not dict: raise AssertionError( 'assert_attr_equal: entry must be a %r; got a %r: %r' % ( dict, type(entry), entry) ) if key not in entry: raise AssertionError( 'assert_attr_equal: entry has no key %r: %r' % (key, entry) ) if value not in entry[key]: raise AssertionError( 'assert_attr_equal: %r: %r not in %r' % (key, value, entry[key]) ) def assert_is_member(entry, value, key='member'): if type(entry) is not dict: raise AssertionError( 'assert_is_member: entry must be a %r; got a %r: %r' % ( dict, type(entry), entry) ) if key not in entry: raise AssertionError( 'assert_is_member: entry has no key %r: %r' % (key, entry) ) for member in entry[key]: if member.startswith(value): return raise AssertionError( 'assert_is_member: %r: %r not in %r' % (key, value, entry[key]) ) # Initialize the API. We do this here so that one can run the tests # individually instead of at the top-level. If API.bootstrap() # has already been called we continue gracefully. Other errors will be # raised. class XMLRPC_test(object): """ Base class for all XML-RPC plugin tests """ @classmethod def setup_class(cls): if not server_available: raise nose.SkipTest('%r: Server not available: %r' % (cls.__module__, api.env.xmlrpc_uri)) def setup(self): if not api.Backend.rpcclient.isconnected(): api.Backend.rpcclient.connect(fallback=False) def teardown(self): """ nose tear-down fixture. """ request.destroy_context() def failsafe_add(self, obj, pk, **options): """ Delete possible leftover entry first, then add. This helps speed us up when a partial test failure has left LDAP in a dirty state. :param obj: An Object like api.Object.user :param pk: The primary key of the entry to be created :param options: Kwargs to be passed to obj.add() """ try: obj.methods['del'](pk) except errors.NotFound: pass return obj.methods['add'](pk, **options) IGNORE = """Command %r is missing attribute %r in output entry. args = %r options = %r entry = %r""" EXPECTED = """Expected %r to raise %s. args = %r options = %r output = %r""" UNEXPECTED = """Expected %r to raise %s, but caught different. args = %r options = %r %s: %s""" KWARGS = """Command %r raised %s with wrong kwargs. args = %r options = %r kw_expected = %r kw_got = %r""" class Declarative(XMLRPC_test): """A declarative-style test suite A Declarative test suite is controlled by the ``tests`` and ``cleanup_commands`` class variables. The ``tests`` is a list of dictionaries with the following keys: ``desc`` A name/description of the test ``command`` A (command, args, kwargs) triple specifying the command to run ``expected`` Can be either an ``errors.PublicError`` instance, in which case the command must fail with the given error; or the expected result. The result is checked with ``tests.util.assert_deepequal``. ``extra_check`` (optional) A checking function that is called with the response. It must return true for the test to pass. The ``cleanup_commands`` is a list of (command, args, kwargs) triples. These are commands get run both before and after tests, and must not fail. """ default_version = API_VERSION cleanup_commands = tuple() tests = tuple() def cleanup_generate(self, stage): for (i, command) in enumerate(self.cleanup_commands): func = lambda: self.cleanup(command) func.description = '%s %s-cleanup[%d]: %r' % ( self.__class__.__name__, stage, i, command ) yield (func,) def cleanup(self, command): (cmd, args, options) = command if cmd not in api.Command: raise nose.SkipTest( 'cleanup command %r not in api.Command' % cmd ) try: api.Command[cmd](*args, **options) except (errors.NotFound, errors.EmptyModlist): pass def test_generator(self): """ Iterate through tests. nose reports each one as a seperate test. """ # Iterate through pre-cleanup: for tup in self.cleanup_generate('pre'): yield tup # Iterate through the tests: name = self.__class__.__name__ for (i, test) in enumerate(self.tests): if callable(test): func = lambda: test(self) nice = '%s[%d]: call %s: %s' % ( name, i, test.__name__, test.__doc__ ) func.description = nice else: nice = '%s[%d]: %s: %s' % ( name, i, test['command'][0], test.get('desc', '') ) func = lambda: self.check(nice, **test) func.description = nice yield (func,) # Iterate through post-cleanup: for tup in self.cleanup_generate('post'): yield tup def check(self, nice, desc, command, expected, extra_check=None): (cmd, args, options) = command options.setdefault('version', self.default_version) if cmd not in api.Command: raise nose.SkipTest('%r not in api.Command' % cmd) if isinstance(expected, errors.PublicError): self.check_exception(nice, cmd, args, options, expected) elif hasattr(expected, '__call__'): self.check_callable(nice, cmd, args, options, expected) else: self.check_output(nice, cmd, args, options, expected, extra_check) def check_exception(self, nice, cmd, args, options, expected): klass = expected.__class__ name = klass.__name__ try: output = api.Command[cmd](*args, **options) except StandardError, e: pass else: raise AssertionError( EXPECTED % (cmd, name, args, options, output) ) if not isinstance(e, klass): raise AssertionError( UNEXPECTED % (cmd, name, args, options, e.__class__.__name__, e) ) # FIXME: the XML-RPC transport doesn't allow us to return structured # information through the exception, so we can't test the kw on the # client side. However, if we switch to using JSON-RPC for the default # transport, the exception is a free-form data structure (dict). # For now just compare the strings assert_deepequal(expected.strerror, e.strerror) def check_callable(self, nice, cmd, args, options, expected): name = expected.__class__.__name__ output = dict() e = None try: output = api.Command[cmd](*args, **options) except StandardError, e: pass if not expected(e, output): raise AssertionError( UNEXPECTED % (cmd, name, args, options, e.__class__.__name__, e) ) def check_output(self, nice, cmd, args, options, expected, extra_check): got = api.Command[cmd](*args, **options) assert_deepequal(expected, got, nice) if extra_check and not extra_check(got): raise AssertionError('Extra check %s failed' % extra_check)