Fix raw format for ACI commands

ACI plugins (permission, selfservice and delegation) were not
prepared to serve ACIs in a raw format, i.e. raw "aci" attribute
taken from LDAP. This patch fixes all these plugins and their
commands to provide provide this format. Few ACI raw format unit
tests were added for all these plugins.

https://fedorahosted.org/freeipa/ticket/2010
https://fedorahosted.org/freeipa/ticket/2223
https://fedorahosted.org/freeipa/ticket/2228
https://fedorahosted.org/freeipa/ticket/2232
This commit is contained in:
Martin Kosek 2012-02-02 21:28:15 +01:00
parent 2e860f6d07
commit cf12f3106a
6 changed files with 166 additions and 66 deletions

View File

@ -55,6 +55,12 @@ EXAMPLES:
ACI_PREFIX=u"delegation" ACI_PREFIX=u"delegation"
output_params = (
Str('aci',
label=_('ACI'),
),
)
class delegation(Object): class delegation(Object):
""" """
Delegation object. Delegation object.
@ -112,6 +118,13 @@ class delegation(Object):
json_dict['methods'] = [m for m in self.methods] json_dict['methods'] = [m for m in self.methods]
return json_dict return json_dict
def postprocess_result(self, result):
try:
# do not include prefix in result
del result['aciprefix']
except KeyError:
pass
api.register(delegation) api.register(delegation)
@ -119,19 +132,14 @@ class delegation_add(crud.Create):
__doc__ = _('Add a new delegation.') __doc__ = _('Add a new delegation.')
msg_summary = _('Added delegation "%(value)s"') msg_summary = _('Added delegation "%(value)s"')
has_output_params = output_params
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
ldap = self.api.Backend.ldap2
if not 'permissions' in kw: if not 'permissions' in kw:
kw['permissions'] = (u'write',) kw['permissions'] = (u'write',)
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
result = api.Command['aci_add'](aciname, **kw)['result'] result = api.Command['aci_add'](aciname, **kw)['result']
self.obj.postprocess_result(result)
# do not include prefix in result
try:
del result['aciprefix']
except KeyError:
pass
return dict( return dict(
result=result, result=result,
@ -150,6 +158,7 @@ class delegation_del(crud.Delete):
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
result = api.Command['aci_del'](aciname, **kw) result = api.Command['aci_del'](aciname, **kw)
self.obj.postprocess_result(result)
return dict( return dict(
result=True, result=True,
value=aciname, value=aciname,
@ -162,16 +171,12 @@ class delegation_mod(crud.Update):
__doc__ = _('Modify a delegation.') __doc__ = _('Modify a delegation.')
msg_summary = _('Modified delegation "%(value)s"') msg_summary = _('Modified delegation "%(value)s"')
has_output_params = output_params
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
result = api.Command['aci_mod'](aciname, **kw)['result'] result = api.Command['aci_mod'](aciname, **kw)['result']
self.obj.postprocess_result(result)
# do not include prefix in result
try:
del result['aciprefix']
except KeyError:
pass
return dict( return dict(
result=result, result=result,
@ -189,18 +194,14 @@ class delegation_find(crud.Search):
) )
takes_options = (gen_pkey_only_option("name"),) takes_options = (gen_pkey_only_option("name"),)
has_output_params = output_params
def execute(self, term, **kw): def execute(self, term, **kw):
ldap = self.api.Backend.ldap2
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
results = api.Command['aci_find'](term, **kw)['result'] results = api.Command['aci_find'](term, **kw)['result']
for aci in results: for aci in results:
# do not include prefix in result self.obj.postprocess_result(aci)
try:
del aci['aciprefix']
except KeyError:
pass
return dict( return dict(
result=results, result=results,
@ -214,19 +215,11 @@ api.register(delegation_find)
class delegation_show(crud.Retrieve): class delegation_show(crud.Retrieve):
__doc__ = _('Display information about a delegation.') __doc__ = _('Display information about a delegation.')
has_output_params = ( has_output_params = output_params
Str('aci',
label=_('ACI'),
),
)
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
result = api.Command['aci_show'](aciname, aciprefix=ACI_PREFIX)['result'] result = api.Command['aci_show'](aciname, aciprefix=ACI_PREFIX, **kw)['result']
# do not include prefix in result self.obj.postprocess_result(result)
try:
del result['aciprefix']
except KeyError:
pass
return dict( return dict(
result=result, result=result,
value=aciname, value=aciname,

View File

@ -84,6 +84,9 @@ output_params = (
Str('ipapermissiontype', Str('ipapermissiontype',
label=_('Permission Type'), label=_('Permission Type'),
), ),
Str('aci',
label=_('ACI'),
),
) )
class permission(LDAPObject): class permission(LDAPObject):
@ -97,7 +100,7 @@ class permission(LDAPObject):
default_attributes = ['cn', 'member', 'memberof', default_attributes = ['cn', 'member', 'memberof',
'memberindirect', 'ipapermissiontype', 'memberindirect', 'ipapermissiontype',
] ]
aci_attributes = ['group', 'permissions', 'attrs', 'type', aci_attributes = ['aci', 'group', 'permissions', 'attrs', 'type',
'filter', 'subtree', 'targetgroup', 'memberof', 'filter', 'subtree', 'targetgroup', 'memberof',
] ]
attribute_members = { attribute_members = {
@ -180,6 +183,7 @@ class permission_add(LDAPCreate):
__doc__ = _('Add a new permission.') __doc__ = _('Add a new permission.')
msg_summary = _('Added permission "%(value)s"') msg_summary = _('Added permission "%(value)s"')
has_output_params = LDAPCreate.has_output_params + output_params
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
# Test the ACI before going any further # Test the ACI before going any further
@ -335,11 +339,15 @@ class permission_mod(LDAPUpdate):
newname=options['rename'], newprefix=ACI_PREFIX) newname=options['rename'], newprefix=ACI_PREFIX)
cn = options['rename'] # rename finished cn = options['rename'] # rename finished
print "permission_rename1", entry_attrs
print "permission_rename1 result options", options
result = self.api.Command.permission_show(cn, **options)['result'] result = self.api.Command.permission_show(cn, **options)['result']
print "permission_rename1 result", result
for r in result: for r in result:
if not r.startswith('member_'): if not r.startswith('member_'):
entry_attrs[r] = result[r] entry_attrs[r] = result[r]
print "permission_rename2", entry_attrs
return dn return dn
api.register(permission_mod) api.register(permission_mod)
@ -359,7 +367,7 @@ class permission_find(LDAPSearch):
for entry in entries: for entry in entries:
(dn, attrs) = entry (dn, attrs) = entry
try: try:
aci = self.api.Command.aci_show(attrs['cn'][0], aciprefix=ACI_PREFIX)['result'] aci = self.api.Command.aci_show(attrs['cn'][0], aciprefix=ACI_PREFIX, **options)['result']
# copy information from respective ACI to permission entry # copy information from respective ACI to permission entry
for attr in self.obj.aci_attributes: for attr in self.obj.aci_attributes:
@ -372,7 +380,13 @@ class permission_find(LDAPSearch):
# aren't already in the list along with their permission info. # aren't already in the list along with their permission info.
options['aciprefix'] = ACI_PREFIX options['aciprefix'] = ACI_PREFIX
aciresults = self.api.Command.aci_find(*args, **options) opts = copy.copy(options)
try:
# permission ACI attribute is needed
del opts['raw']
except:
pass
aciresults = self.api.Command.aci_find(*args, **opts)
truncated = truncated or aciresults['truncated'] truncated = truncated or aciresults['truncated']
results = aciresults['result'] results = aciresults['result']
@ -385,15 +399,11 @@ class permission_find(LDAPSearch):
found = True found = True
break break
if not found: if not found:
permission = self.api.Command.permission_show(aci['permission']) permission = self.api.Command.permission_show(aci['permission'], **options)['result']
attrs = permission['result'] dn = permission['dn']
for attr in self.obj.aci_attributes: del permission['dn']
if attr in aci: if (dn, permission) not in entries:
attrs[attr] = aci[attr] entries.append((dn, permission))
dn = attrs['dn']
del attrs['dn']
if (dn, attrs) not in entries:
entries.append((dn, attrs))
api.register(permission_find) api.register(permission_find)
@ -404,7 +414,7 @@ class permission_show(LDAPRetrieve):
has_output_params = LDAPRetrieve.has_output_params + output_params has_output_params = LDAPRetrieve.has_output_params + output_params
def post_callback(self, ldap, dn, entry_attrs, *keys, **options): def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
try: try:
aci = self.api.Command.aci_show(keys[-1], aciprefix=ACI_PREFIX)['result'] aci = self.api.Command.aci_show(keys[-1], aciprefix=ACI_PREFIX, **options)['result']
for attr in self.obj.aci_attributes: for attr in self.obj.aci_attributes:
if attr in aci: if attr in aci:
entry_attrs[attr] = aci[attr] entry_attrs[attr] = aci[attr]

View File

@ -54,17 +54,11 @@ EXAMPLES:
ACI_PREFIX=u"selfservice" ACI_PREFIX=u"selfservice"
def is_selfservice(aciname): output_params = (
""" Str('aci',
Determine if the ACI is a Self-service ACI and raise an exception if it label=_('ACI'),
isn't. ),
)
Return the result if it is a self-service ACI.
"""
result = api.Command['aci_show'](aciname, aciprefix=ACI_PREFIX)['result']
if 'selfaci' not in result or result['selfaci'] == False:
raise errors.NotFound(reason=_('Self-service permission \'%(permission)s\' not found') % dict(permission=aciname))
return result
class selfservice(Object): class selfservice(Object):
""" """
@ -112,6 +106,13 @@ class selfservice(Object):
json_dict['methods'] = [m for m in self.methods] json_dict['methods'] = [m for m in self.methods]
return json_dict return json_dict
def postprocess_result(self, result):
try:
# do not include prefix in result
del result['aciprefix']
except KeyError:
pass
api.register(selfservice) api.register(selfservice)
@ -119,6 +120,7 @@ class selfservice_add(crud.Create):
__doc__ = _('Add a new self-service permission.') __doc__ = _('Add a new self-service permission.')
msg_summary = _('Added selfservice "%(value)s"') msg_summary = _('Added selfservice "%(value)s"')
has_output_params = output_params
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
if not 'permissions' in kw: if not 'permissions' in kw:
@ -126,7 +128,7 @@ class selfservice_add(crud.Create):
kw['selfaci'] = True kw['selfaci'] = True
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
result = api.Command['aci_add'](aciname, **kw)['result'] result = api.Command['aci_add'](aciname, **kw)['result']
del result['aciprefix'] # do not include prefix in result self.obj.postprocess_result(result)
return dict( return dict(
result=result, result=result,
@ -143,9 +145,9 @@ class selfservice_del(crud.Delete):
msg_summary = _('Deleted selfservice "%(value)s"') msg_summary = _('Deleted selfservice "%(value)s"')
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
is_selfservice(aciname)
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
result = api.Command['aci_del'](aciname, **kw) result = api.Command['aci_del'](aciname, **kw)
self.obj.postprocess_result(result)
return dict( return dict(
result=True, result=True,
@ -159,15 +161,16 @@ class selfservice_mod(crud.Update):
__doc__ = _('Modify a self-service permission.') __doc__ = _('Modify a self-service permission.')
msg_summary = _('Modified selfservice "%(value)s"') msg_summary = _('Modified selfservice "%(value)s"')
has_output_params = output_params
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
is_selfservice(aciname)
if 'attrs' in kw and kw['attrs'] is None: if 'attrs' in kw and kw['attrs'] is None:
raise errors.RequirementError(name='attrs') raise errors.RequirementError(name='attrs')
kw['aciprefix'] = ACI_PREFIX kw['aciprefix'] = ACI_PREFIX
result = api.Command['aci_mod'](aciname, **kw)['result'] result = api.Command['aci_mod'](aciname, **kw)['result']
del result['aciprefix'] # do not include prefix in result self.obj.postprocess_result(result)
return dict( return dict(
result=result, result=result,
value=aciname, value=aciname,
@ -184,6 +187,7 @@ class selfservice_find(crud.Search):
) )
takes_options = (gen_pkey_only_option("name"),) takes_options = (gen_pkey_only_option("name"),)
has_output_params = output_params
def execute(self, term, **kw): def execute(self, term, **kw):
kw['selfaci'] = True kw['selfaci'] = True
@ -191,7 +195,7 @@ class selfservice_find(crud.Search):
result = api.Command['aci_find'](term, **kw)['result'] result = api.Command['aci_find'](term, **kw)['result']
for aci in result: for aci in result:
del aci['aciprefix'] # do not include prefix in result self.obj.postprocess_result(aci)
return dict( return dict(
result=result, result=result,
@ -205,15 +209,11 @@ api.register(selfservice_find)
class selfservice_show(crud.Retrieve): class selfservice_show(crud.Retrieve):
__doc__ = _('Display information about a self-service permission.') __doc__ = _('Display information about a self-service permission.')
has_output_params = ( has_output_params = output_params
Str('aci',
label=_('ACI'),
),
)
def execute(self, aciname, **kw): def execute(self, aciname, **kw):
result = is_selfservice(aciname) result = api.Command['aci_show'](aciname, aciprefix=ACI_PREFIX, **kw)['result']
del result['aciprefix'] # do not include prefix in result self.obj.postprocess_result(result)
return dict( return dict(
result=result, result=result,
value=aciname, value=aciname,

View File

@ -126,6 +126,20 @@ class test_delegation(Declarative):
), ),
dict(
desc='Retrieve %r with --raw' % delegation1,
command=('delegation_show', [delegation1], {'raw' : True}),
expected=dict(
value=delegation1,
summary=None,
result={
'aci': u'(targetattr = "street || c || l || st || postalcode")(targetfilter = "(memberOf=cn=admins,cn=groups,cn=accounts,%s)")(version 3.0;acl "delegation:testdelegation";allow (write) groupdn = "ldap:///cn=editors,cn=groups,cn=accounts,%s";)' \
% (api.env.basedn, api.env.basedn)
},
),
),
dict( dict(
desc='Search for %r' % delegation1, desc='Search for %r' % delegation1,
command=('delegation_find', [delegation1], {}), command=('delegation_find', [delegation1], {}),
@ -162,6 +176,23 @@ class test_delegation(Declarative):
), ),
dict(
desc='Search for %r with --raw' % delegation1,
command=('delegation_find', [delegation1], {'raw' : True}),
expected=dict(
count=1,
truncated=False,
summary=u'1 delegation matched',
result=[
{
'aci': u'(targetattr = "street || c || l || st || postalcode")(targetfilter = "(memberOf=cn=admins,cn=groups,cn=accounts,%s)")(version 3.0;acl "delegation:testdelegation";allow (write) groupdn = "ldap:///cn=editors,cn=groups,cn=accounts,%s";)' \
% (api.env.basedn, api.env.basedn),
},
],
),
),
dict( dict(
desc='Update %r' % delegation1, desc='Update %r' % delegation1,
command=( command=(

View File

@ -180,6 +180,23 @@ class test_permission(Declarative):
), ),
dict(
desc='Retrieve %r with --raw' % permission1,
command=('permission_show', [permission1], {'raw' : True}),
expected=dict(
value=permission1,
summary=None,
result={
'dn': unicode(permission1_dn),
'cn': [permission1],
'member': [unicode(privilege1_dn)],
'aci': u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "permission:testperm";allow (write) groupdn = "ldap:///cn=testperm,cn=permissions,cn=pbac,%s";)' \
% (api.env.basedn, api.env.basedn),
},
),
),
dict( dict(
desc='Search for %r' % permission1, desc='Search for %r' % permission1,
command=('permission_find', [permission1], {}), command=('permission_find', [permission1], {}),
@ -220,6 +237,26 @@ class test_permission(Declarative):
), ),
dict(
desc='Search for %r with --raw' % permission1,
command=('permission_find', [permission1], {'raw' : True}),
expected=dict(
count=1,
truncated=False,
summary=u'1 permission matched',
result=[
{
'dn': unicode(permission1_dn),
'cn': [permission1],
'member': [unicode(privilege1_dn)],
'aci': u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "permission:testperm";allow (write) groupdn = "ldap:///cn=testperm,cn=permissions,cn=pbac,%s";)' \
% (api.env.basedn, api.env.basedn),
},
],
),
),
dict( dict(
desc='Create %r' % permission2, desc='Create %r' % permission2,
command=( command=(

View File

@ -119,6 +119,19 @@ class test_selfservice(Declarative):
), ),
dict(
desc='Retrieve %r with --raw' % selfservice1,
command=('selfservice_show', [selfservice1], {'raw':True}),
expected=dict(
value=selfservice1,
summary=None,
result={
'aci': u'(targetattr = "street || c || l || st || postalcode")(version 3.0;acl "selfservice:testself";allow (write) userdn = "ldap:///self";)',
},
),
),
dict( dict(
desc='Search for %r' % selfservice1, desc='Search for %r' % selfservice1,
command=('selfservice_find', [selfservice1], {}), command=('selfservice_find', [selfservice1], {}),
@ -172,6 +185,22 @@ class test_selfservice(Declarative):
), ),
dict(
desc='Search for %r with --raw' % selfservice1,
command=('selfservice_find', [selfservice1], {'raw':True}),
expected=dict(
count=1,
truncated=False,
summary=u'1 selfservice matched',
result=[
{
'aci': u'(targetattr = "street || c || l || st || postalcode")(version 3.0;acl "selfservice:testself";allow (write) userdn = "ldap:///self";)'
},
],
),
),
dict( dict(
desc='Update %r' % selfservice1, desc='Update %r' % selfservice1,
command=( command=(