Add support for external group members

When using ipaExternalGroup/ipaExternalMember attributes it is
possible to add group members which don't exist in IPA database.
This is primarily is required for AD trusts support and therefore
validation is accepting only secure identifier (SID) format.

https://fedorahosted.org/freeipa/ticket/2664
This commit is contained in:
Alexander Bokovoy 2012-06-20 16:08:33 +03:00 committed by Martin Kosek
parent 52f69aaa8a
commit a6ff85f425
11 changed files with 358 additions and 29 deletions

12
API.txt
View File

@ -1208,13 +1208,14 @@ output: Output('total', <type 'int'>, None)
output: Output('count', <type 'int'>, None)
output: Output('summary', (<type 'unicode'>, <type 'NoneType'>), None)
command: group_add
args: 1,8,3
args: 1,9,3
arg: Str('cn', attribute=True, cli_name='group_name', maxlength=255, multivalue=False, pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$', primary_key=True, required=True)
option: Str('description', attribute=True, cli_name='desc', multivalue=False, required=True)
option: Int('gidnumber', attribute=True, cli_name='gid', minvalue=1, multivalue=False, required=False)
option: Str('setattr*', cli_name='setattr', exclude='webui')
option: Str('addattr*', cli_name='addattr', exclude='webui')
option: Flag('nonposix', autofill=True, cli_name='nonposix', default=False)
option: Flag('external', autofill=True, cli_name='external', default=False)
option: Flag('all', autofill=True, cli_name='all', default=False, exclude='webui')
option: Flag('raw', autofill=True, cli_name='raw', default=False, exclude='webui')
option: Str('version?', exclude='webui')
@ -1222,8 +1223,9 @@ output: Output('summary', (<type 'unicode'>, <type 'NoneType'>), None)
output: Entry('result', <type 'dict'>, Gettext('A dictionary representing an LDAP entry', domain='ipa', localedir=None))
output: Output('value', <type 'unicode'>, None)
command: group_add_member
args: 1,5,3
args: 1,6,3
arg: Str('cn', attribute=True, cli_name='group_name', maxlength=255, multivalue=False, pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$', primary_key=True, query=True, required=True)
option: Str('ipaexternalmember*', cli_name='external', csv=True)
option: Flag('all', autofill=True, cli_name='all', default=False, exclude='webui')
option: Flag('raw', autofill=True, cli_name='raw', default=False, exclude='webui')
option: Str('version?', exclude='webui')
@ -1277,7 +1279,7 @@ output: ListOfEntries('result', (<type 'list'>, <type 'tuple'>), Gettext('A list
output: Output('count', <type 'int'>, None)
output: Output('truncated', <type 'bool'>, None)
command: group_mod
args: 1,11,3
args: 1,12,3
arg: Str('cn', attribute=True, cli_name='group_name', maxlength=255, multivalue=False, pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$', primary_key=True, query=True, required=True)
option: Str('description', attribute=True, autofill=False, cli_name='desc', multivalue=False, required=False)
option: Int('gidnumber', attribute=True, autofill=False, cli_name='gid', minvalue=1, multivalue=False, required=False)
@ -1286,6 +1288,7 @@ option: Str('addattr*', cli_name='addattr', exclude='webui')
option: Str('delattr*', cli_name='delattr', exclude='webui')
option: Flag('rights', autofill=True, default=False)
option: Flag('posix', autofill=True, cli_name='posix', default=False)
option: Flag('external', autofill=True, cli_name='external', default=False)
option: Flag('all', autofill=True, cli_name='all', default=False, exclude='webui')
option: Flag('raw', autofill=True, cli_name='raw', default=False, exclude='webui')
option: Str('version?', exclude='webui')
@ -1294,8 +1297,9 @@ output: Output('summary', (<type 'unicode'>, <type 'NoneType'>), None)
output: Entry('result', <type 'dict'>, Gettext('A dictionary representing an LDAP entry', domain='ipa', localedir=None))
output: Output('value', <type 'unicode'>, None)
command: group_remove_member
args: 1,5,3
args: 1,6,3
arg: Str('cn', attribute=True, cli_name='group_name', maxlength=255, multivalue=False, pattern='^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,252}[a-zA-Z0-9_.$-]?$', primary_key=True, query=True, required=True)
option: Str('ipaexternalmember*', cli_name='external', csv=True)
option: Flag('all', autofill=True, cli_name='all', default=False, exclude='webui')
option: Flag('raw', autofill=True, cli_name='raw', default=False, exclude='webui')
option: Str('version?', exclude='webui')

View File

@ -1277,6 +1277,56 @@ class SingleMatchExpected(ExecutionError):
format = _('The search criteria was not specific enough. Expected 1 and found %(found)d.')
class AlreadyExternalGroup(ExecutionError):
"""
**4028** Raised when a group is already an external member group
For example:
>>> raise AlreadyExternalGroup
Traceback (most recent call last):
...
AlreadyExternalGroup: This group already allows external members
"""
errno = 4028
format = _('This group already allows external members')
class ExternalGroupViolation(ExecutionError):
"""
**4029** Raised when a group is already an external member group
and an attempt is made to use it as posix group
For example:
>>> raise ExternalGroupViolation
Traceback (most recent call last):
...
ExternalGroupViolation: This group cannot be posix because it is external
"""
errno = 4029
format = _('This group cannot be posix because it is external')
class PosixGroupViolation(ExecutionError):
"""
**4030** Raised when a group is already a posix group
and cannot be converted to external
For example:
>>> raise PosixGroupViolation
Traceback (most recent call last):
...
PosixGroupViolation: This is already a posix group and cannot be converted to external one
"""
errno = 4030
format = _('This is already a posix group and cannot be converted to external one')
class BuiltinError(ExecutionError):
"""
**4100** Base class for builtin execution errors (*4100 - 4199*).

View File

@ -362,23 +362,29 @@ def add_external_post_callback(memberattr, membertype, externalattr, ldap, compl
externalattr is one of externaluser,
"""
completed_external = 0
normalize = options.get('external_callback_normalize', True)
# Sift through the failures. We assume that these are all
# entries that aren't stored in IPA, aka external entries.
if memberattr in failed and membertype in failed[memberattr]:
(dn, entry_attrs_) = ldap.get_entry(dn, [externalattr])
members = entry_attrs.get(memberattr, [])
external_entries = entry_attrs_.get(externalattr, [])
lc_external_entries = set(e.lower() for e in external_entries)
failed_entries = []
for entry in failed[memberattr][membertype]:
membername = entry[0].lower()
member_dn = api.Object[membertype].get_dn(membername)
if membername not in external_entries and \
member_dn not in members:
if (membername not in lc_external_entries and
member_dn not in members):
# Not an IPA entry, assume external
external_entries.append(membername)
if normalize:
external_entries.append(membername)
else:
external_entries.append(entry[0])
lc_external_entries.add(membername)
completed_external += 1
elif membername in external_entries and \
member_dn not in members:
elif (membername in lc_external_entries and
member_dn not in members):
# Already an external member, reset the error message
msg = unicode(errors.AlreadyGroupMember().message)
newerror = (entry[0], msg)
@ -409,8 +415,11 @@ def remove_external_post_callback(memberattr, membertype, externalattr, ldap, co
completed_external = 0
for entry in failed[memberattr][membertype]:
membername = entry[0].lower()
if membername in external_entries:
external_entries.remove(membername)
if membername in external_entries or entry[0] in external_entries:
try:
external_entries.remove(membername)
except ValueError:
external_entries.remove(entry[0])
completed_external += 1
else:
failed_entries.append(membername)

View File

@ -22,6 +22,12 @@ from ipalib import api
from ipalib import Int, Str
from ipalib.plugins.baseldap import *
from ipalib import _, ngettext
if api.env.in_server and api.env.context in ['lite', 'server']:
try:
import ipaserver.dcerpc
_dcerpc_bindings_installed = True
except Exception, e:
_dcerpc_bindings_installed = False
__doc__ = _("""
Groups of users
@ -83,11 +89,11 @@ class group(LDAPObject):
object_name_plural = _('groups')
object_class = ['ipausergroup']
object_class_config = 'ipagroupobjectclasses'
possible_objectclasses = ['posixGroup', 'mepManagedEntry']
possible_objectclasses = ['posixGroup', 'mepManagedEntry', 'ipaExternalGroup']
search_attributes_config = 'ipagroupsearchfields'
default_attributes = [
'cn', 'description', 'gidnumber', 'member', 'memberof',
'memberindirect', 'memberofindirect',
'memberindirect', 'memberofindirect', 'ipaexternalmember',
]
uuid_attribute = 'ipauniqueid'
attribute_members = {
@ -139,10 +145,22 @@ class group_add(LDAPCreate):
doc=_('Create as a non-POSIX group'),
default=False,
),
Flag('external',
cli_name='external',
doc=_('Allow adding external non-IPA members from trusted domains'),
default=False,
),
)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
if not options['nonposix']:
# As both 'external' and 'nonposix' options have default= set for
# them, they will always be present in options dict, thus we can
# safely reference the values
if options['external']:
entry_attrs['objectclass'].append('ipaexternalgroup')
if 'gidnumber' in options:
raise errors.RequirementError(name='gid')
elif not options['nonposix']:
entry_attrs['objectclass'].append('posixgroup')
if not 'gidnumber' in options:
entry_attrs['gidnumber'] = 999
@ -194,11 +212,18 @@ class group_mod(LDAPUpdate):
cli_name='posix',
doc=_('change to a POSIX group'),
),
Flag('external',
cli_name='external',
doc=_('change to support external non-IPA members from trusted domains'),
default=False,
),
)
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
if options['posix'] or 'gidnumber' in options:
if ('posix' in options and options['posix']) or 'gidnumber' in options:
(dn, old_entry_attrs) = ldap.get_entry(dn, ['objectclass'])
if 'ipaexternalgroup' in old_entry_attrs['objectclass']:
raise errors.ExternalGroupViolation()
if 'posixgroup' in old_entry_attrs['objectclass']:
if options['posix']:
raise errors.AlreadyPosixGroup()
@ -207,6 +232,15 @@ class group_mod(LDAPUpdate):
entry_attrs['objectclass'] = old_entry_attrs['objectclass']
if not 'gidnumber' in options:
entry_attrs['gidnumber'] = 999
if options['external']:
(dn, old_entry_attrs) = ldap.get_entry(dn, ['objectclass'])
if 'posixgroup' in old_entry_attrs['objectclass']:
raise errors.PosixGroupViolation()
if 'ipaexternalgroup' in old_entry_attrs['objectclass']:
raise errors.AlreadyExternalGroup()
else:
old_entry_attrs['objectclass'].append('ipaexternalgroup')
entry_attrs['objectclass'] = old_entry_attrs['objectclass']
# Can't check for this in a validator because we lack context
if 'gidnumber' in options and options['gidnumber'] is None:
raise errors.RequirementError(name='gid')
@ -274,12 +308,64 @@ api.register(group_show)
class group_add_member(LDAPAddMember):
__doc__ = _('Add members to a group.')
takes_options = (
Str('ipaexternalmember*',
cli_name='external',
label=_('External member'),
doc=_('comma-separated SIDs of members of a trusted domain'),
csv=True,
flags=['no_create', 'no_update', 'no_search'],
),
)
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
result = (completed, dn)
if 'ipaexternalmember' in options:
if not _dcerpc_bindings_installed:
raise errors.NotFound(name=_('AD Trust'),
reason=_('''Cannot perform external member validation without Samba 4 support installed.
Make sure you have installed server-trust-ad sub-package of IPA on the server'''))
domain_validator = ipaserver.dcerpc.DomainValidator(self.api)
if not domain_validator.is_configured():
raise errors.NotFound(name=_('AD Trust setup'),
reason=_('''Cannot perform join operation without own domain configured.
Make sure you have run ipa-adtrust-install on the IPA server first'''))
sids = []
failed_sids = []
for sid in options['ipaexternalmember']:
if domain_validator.is_trusted_sid_valid(sid):
sids.append(sid)
else:
failed_sids.append((sid, 'Not a trusted domain SID'))
if len(sids) == 0:
raise errors.ValidationError(name=_('external member'),
error=_('values are not recognized as valid SIDs from trusted domain'))
restore = []
if 'member' in failed and 'group' in failed['member']:
restore = failed['member']['group']
failed['member']['group'] = list((id,id) for id in sids)
result = add_external_post_callback('member', 'group', 'ipaexternalmember',
ldap, completed, failed, dn, entry_attrs,
keys, options, external_callback_normalize=False)
failed['member']['group'] = restore + failed_sids
return result
api.register(group_add_member)
class group_remove_member(LDAPRemoveMember):
__doc__ = _('Remove members from a group.')
takes_options = (
Str('ipaexternalmember*',
cli_name='external',
label=_('External member'),
doc=_('comma-separated SIDs of members of a trusted domain'),
csv=True,
flags=['no_create', 'no_update', 'no_search'],
),
)
def pre_callback(self, ldap, dn, found, not_found, *keys, **options):
if keys[0] == protected_group_name:
result = api.Command.group_show(protected_group_name)
@ -290,6 +376,20 @@ class group_remove_member(LDAPRemoveMember):
label=_(u'group'), container=protected_group_name)
return dn
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
result = (completed, dn)
if 'ipaexternalmember' in options:
sids = options['ipaexternalmember']
restore = list()
if 'member' in failed and 'group' in failed['member']:
restore = failed['member']['group']
failed['member']['group'] = list((id,id) for id in sids)
result = remove_external_post_callback('member', 'group', 'ipaexternalmember',
ldap, completed, failed, dn, entry_attrs,
keys, options)
failed['member']['group'] = restore
return result
api.register(group_remove_member)

View File

@ -171,6 +171,10 @@ class trust_add(LDAPCreate):
realm_server = options['realm_server']
trustinstance = ipaserver.dcerpc.TrustDomainJoins(self.api)
if not trustinstance.configured:
raise errors.NotFound(name=_('AD Trust setup'),
reason=_('''Cannot perform join operation without own domain configured.
Make sure you have run ipa-adtrust-install on the IPA server first'''))
# 1. Full access to the remote domain. Use admin credentials and
# generate random trustdom password to do work on both sides

View File

@ -58,6 +58,79 @@ class ExtendedDNControl(_ldap.controls.RequestControl):
def encodeControlValue(self):
return '0\x03\x02\x01\x01'
class DomainValidator(object):
ATTR_FLATNAME = 'ipantflatname'
ATTR_SID = 'ipantsecurityidentifier'
ATTR_TRUSTED_SID = 'ipanttrusteddomainsid'
def __init__(self, api):
self.api = api
self.ldap = self.api.Backend.ldap2
self.domain = None
self.flatname = None
self.dn = None
self.sid = None
self._domains = None
def is_configured(self):
cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
try:
(dn, entry_attrs) = self.ldap.get_entry(unicode(cn_trust_local), [self.ATTR_FLATNAME, self.ATTR_SID])
self.flatname = entry_attrs[self.ATTR_FLATNAME][0]
self.sid = entry_attrs[self.ATTR_SID][0]
self.dn = dn
self.domain = self.api.env.domain
except errors.NotFound, e:
return False
return True
def get_trusted_domains(self):
cn_trust = DN(('cn', 'ad'), self.api.env.container_trusts, self.api.env.basedn)
try:
search_kw = {'objectClass': 'ipaNTTrustedDomain'}
filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
(entries, truncated) = self.ldap.find_entries(filter=filter, base_dn=unicode(cn_trust),
attrs_list=[self.ATTR_TRUSTED_SID, 'dn'])
return entries
except errors.NotFound, e:
return []
def is_trusted_sid_valid(self, sid):
if not self.domain:
# our domain is not configured or self.is_configured() never run
# reject SIDs as we can't check correctness of them
return False
# Parse sid string to see if it is really in a SID format
try:
test_sid = security.dom_sid(sid)
except TypeError:
return False
(dom, sid_rid) = test_sid.split()
sid_dom = str(dom)
# Now we have domain prefix of the sid as sid_dom string and can
# analyze it against known prefixes
if sid_dom.find(security.SID_NT_AUTHORITY) != 0:
# Ignore any potential SIDs that are not S-1-5-*
return False
if sid_dom.find(self.sid) == 0:
# A SID from our own domain cannot be treated as trusted domain's SID
return False
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
# This means we can't check the correctness of a trusted domain SIDs
return False
# We have non-zero list of trusted domains and have to go through them
# one by one and check their sids as prefixes
for (dn, domaininfo) in self._domains:
if sid_dom.find(domaininfo[self.ATTR_TRUSTED_SID][0]) == 0:
return True
return False
class TrustDomainInstance(object):
def __init__(self, hostname, creds=None):
@ -247,20 +320,18 @@ class TrustDomainInstance(object):
self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
class TrustDomainJoins(object):
ATTR_FLATNAME = 'ipantflatname'
def __init__(self, api):
self.api = api
self.local_domain = None
self.remote_domain = None
self.ldap = self.api.Backend.ldap2
cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
(dn, entry_attrs) = self.ldap.get_entry(unicode(cn_trust_local), [self.ATTR_FLATNAME])
self.local_flatname = entry_attrs[self.ATTR_FLATNAME][0]
self.local_dn = dn
domain_validator = DomainValidator(api)
self.configured = domain_validator.is_configured()
self.__populate_local_domain()
if self.configured:
self.local_flatname = domain_validator.flatname
self.local_dn = domain_validator.dn
self.__populate_local_domain()
def __populate_local_domain(self):
# Initialize local domain info using kerberos only
@ -308,6 +379,9 @@ class TrustDomainJoins(object):
self.remote_domain = rd
def join_ad_full_credentials(self, realm, realm_server, realm_admin, realm_passwd):
if not self.configured:
return None
self.__populate_remote_domain(realm, realm_server, realm_admin, realm_passwd)
if not self.remote_domain.read_only:
trustdom_pass = samba.generate_random_password(128, 128)
@ -317,6 +391,9 @@ class TrustDomainJoins(object):
return None
def join_ad_ipa_half(self, realm, realm_server, trustdom_passwd):
if not self.configured:
return None
self.__populate_remote_domain(realm, realm_server, realm_passwd=None)
self.local_domain.establish_trust(self.remote_domain, trustdom_passwd)
return dict(local=self.local_domain, remote=self.remote_domain)

View File

@ -66,6 +66,7 @@ class TestCLIParsing(object):
cn=u'tgroup1',
description=u'Test group',
nonposix=False,
external=False,
raw=False,
all=False,
version=API_VERSION)
@ -88,6 +89,7 @@ class TestCLIParsing(object):
cn=u'tgroup1',
description=u'Test group',
nonposix=True,
external=False,
raw=False,
all=False,
version=API_VERSION)
@ -99,6 +101,7 @@ class TestCLIParsing(object):
description=u'Test group',
gidnumber=u'1234',
nonposix=False,
external=False,
raw=False,
all=False,
version=API_VERSION)
@ -109,6 +112,7 @@ class TestCLIParsing(object):
cn=u'tgroup1',
description=u'Test group',
nonposix=False,
external=False,
raw=False,
all=False,
version=API_VERSION)

View File

@ -45,6 +45,8 @@ group = [
u'ipaobject',
]
externalgroup = group + [u'ipaexternalgroup']
host = [
u'ipasshhost',
u'ipaSshGroupOfPubKeys',

View File

@ -28,11 +28,18 @@ from ipalib.dn import *
group1 = u'testgroup1'
group2 = u'testgroup2'
group3 = u'testgroup3'
renamedgroup1 = u'testgroup'
user1 = u'tuser1'
invalidgroup1=u'+tgroup1'
# When adding external SID member to a group we can't test
# it fully due to possibly missing Samba 4 python bindings
# and/or not configured AD trusts. Thus, we'll use incorrect
# SID value to merely test that proper exceptions are raised
external_sid1=u'S-1-1-123456-789-1'
def get_group_dn(cn):
return DN(('cn', cn), api.env.container_group, api.env.basedn)
@ -40,6 +47,7 @@ class test_group(Declarative):
cleanup_commands = [
('group_del', [group1], {}),
('group_del', [group2], {}),
('group_del', [group3], {}),
('user_del', [user1], {}),
]
@ -373,6 +381,63 @@ class test_group(Declarative):
),
),
###############
# test external SID members for group3:
dict(
desc='Create external %r' % group3,
command=(
'group_add', [group3], dict(description=u'Test desc 3',external=True)
),
expected=dict(
value=group3,
summary=u'Added group "testgroup3"',
result=dict(
cn=[group3],
description=[u'Test desc 3'],
objectclass=objectclasses.externalgroup,
ipauniqueid=[fuzzy_uuid],
dn=lambda x: DN(x) == get_group_dn(group3),
),
),
),
dict(
desc='Convert posix group %r to support external membership' % (group2),
command=(
'group_mod', [group2], dict(external=True)
),
expected=errors.PosixGroupViolation(),
),
dict(
desc='Convert external members group %r to posix' % (group3),
command=(
'group_mod', [group3], dict(posix=True)
),
expected=errors.ExternalGroupViolation(),
),
dict(
desc='Add external member %r to %r' % (external_sid1, group3),
command=(
'group_add_member', [group3], dict(ipaexternalmember=external_sid1)
),
expected=lambda x, output: type(x) == errors.ValidationError or type(x) == errors.NotFound,
),
dict(
desc='Remove group %r with external membership' % (group3),
command=('group_del', [group3], {}),
expected=dict(
result=dict(failed=u''),
value=group3,
summary=u'Deleted group "testgroup3"',
),
),
###############

View File

@ -1059,7 +1059,7 @@ class test_user(Declarative):
command=(
'config_mod', [], dict(ipahomesrootdir=u'/other-home'),
),
expected=lambda x: True,
expected=lambda x, output: x is None,
),
dict(
@ -1107,7 +1107,7 @@ class test_user(Declarative):
command=(
'config_mod', [], dict(ipahomesrootdir=u'/home'),
),
expected=lambda x: True,
expected=lambda x, output: x is None,
),
dict(
@ -1125,7 +1125,7 @@ class test_user(Declarative):
command=(
'config_mod', [], dict(ipadefaultloginshell=u'/usr/bin/ipython'),
),
expected=lambda x: True,
expected=lambda x, output: x is None,
),
dict(
@ -1172,7 +1172,7 @@ class test_user(Declarative):
command=(
'config_mod', [], dict(ipadefaultloginshell=u'/bin/sh'),
),
expected=lambda x: True,
expected=lambda x, output: x is None,
),
dict(
@ -1245,7 +1245,7 @@ class test_user(Declarative):
command=(
'config_mod', [], dict(ipadefaultprimarygroup=group1),
),
expected=lambda x: True,
expected=lambda x, output: x is None,
),
dict(
@ -1328,7 +1328,7 @@ class test_user(Declarative):
command=(
'config_mod', [], dict(ipadefaultprimarygroup=u'ipausers'),
),
expected=lambda x: True,
expected=lambda x, output: x is None,
),
dict(

View File

@ -260,6 +260,8 @@ class Declarative(XMLRPC_test):
raise nose.SkipTest('%r not in api.Command' % cmd)
if isinstance(expected, errors.PublicError):
self.check_exception(nice, cmd, args, options, expected)
elif hasattr(expected, '__call__'):
self.check_callable(nice, cmd, args, options, expected)
else:
self.check_output(nice, cmd, args, options, expected, extra_check)
@ -285,6 +287,18 @@ class Declarative(XMLRPC_test):
# For now just compare the strings
assert_deepequal(expected.strerror, e.strerror)
def check_callable(self, nice, cmd, args, options, expected):
output = dict()
e = None
try:
output = api.Command[cmd](*args, **options)
except StandardError, e:
pass
if not expected(e, output):
raise AssertionError(
UNEXPECTED % (cmd, args, options, e.__class__.__name__, e)
)
def check_output(self, nice, cmd, args, options, expected, extra_check):
got = api.Command[cmd](*args, **options)
assert_deepequal(expected, got, nice)