mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Fix aci_mod command. It should handle more complex operations now.
The problem was trying to operate directly on the ACI itself. I introduced a new function, _aci_to_kw(), that converts an ACI into a set of keywords. We can take these keywords, like those passed in when an ACI is created, to merge in any changes and then re-create the ACI. I also switched the ACI tests to be declarative and added a lot more cases around the modify operation.
This commit is contained in:
parent
901ccc1393
commit
93e54366f9
@ -145,7 +145,7 @@ def _make_aci(current, aciname, kw):
|
|||||||
if 'attrs' in kw:
|
if 'attrs' in kw:
|
||||||
a.set_target_attr(kw['attrs'])
|
a.set_target_attr(kw['attrs'])
|
||||||
if 'memberof' in kw:
|
if 'memberof' in kw:
|
||||||
(dn, entry_attrs) = api.Command['group_show'](kw['memberof'])
|
entry_attrs = api.Command['group_show'](kw['memberof'])['result']
|
||||||
a.set_target_filter('memberOf=%s' % dn)
|
a.set_target_filter('memberOf=%s' % dn)
|
||||||
if 'filter' in kw:
|
if 'filter' in kw:
|
||||||
a.set_target_filter(kw['filter'])
|
a.set_target_filter(kw['filter'])
|
||||||
@ -154,7 +154,7 @@ def _make_aci(current, aciname, kw):
|
|||||||
a.set_target(target)
|
a.set_target(target)
|
||||||
if 'targetgroup' in kw:
|
if 'targetgroup' in kw:
|
||||||
# Purposely no try here so we'll raise a NotFound
|
# Purposely no try here so we'll raise a NotFound
|
||||||
(dn, entry_attrs) = api.Command['group_show'](kw['targetgroup'])
|
entry_attrs = api.Command['group_show'](kw['targetgroup'])['result']
|
||||||
target = 'ldap:///%s' % dn
|
target = 'ldap:///%s' % dn
|
||||||
a.set_target(target)
|
a.set_target(target)
|
||||||
if 'subtree' in kw:
|
if 'subtree' in kw:
|
||||||
@ -166,6 +166,53 @@ def _make_aci(current, aciname, kw):
|
|||||||
|
|
||||||
return a
|
return a
|
||||||
|
|
||||||
|
def _aci_to_kw(ldap, a):
|
||||||
|
"""Convert an ACI into its equivalent keywords.
|
||||||
|
|
||||||
|
This is used for the modify operation so we can merge the
|
||||||
|
incoming kw and existing ACI and pass the result to
|
||||||
|
_make_aci().
|
||||||
|
"""
|
||||||
|
kw = {}
|
||||||
|
kw['aciname'] = a.name
|
||||||
|
kw['permissions'] = tuple(a.permissions)
|
||||||
|
if 'targetattr' in a.target:
|
||||||
|
kw['attrs'] = tuple(a.target['targetattr']['expression'])
|
||||||
|
if 'targetfilter' in a.target:
|
||||||
|
target = a.target['targetfilter']['expression']
|
||||||
|
if target.startswith('memberOf'):
|
||||||
|
kw['memberof'] = target
|
||||||
|
else:
|
||||||
|
kw['filter'] = target
|
||||||
|
if 'target' in a.target:
|
||||||
|
target = a.target['target']['expression']
|
||||||
|
found = False
|
||||||
|
for k in _type_map.keys():
|
||||||
|
if _type_map[k] == target:
|
||||||
|
kw['type'] = unicode(k)
|
||||||
|
found = True
|
||||||
|
break;
|
||||||
|
if not found:
|
||||||
|
if target.startswith('('):
|
||||||
|
kw['filter'] = target
|
||||||
|
else:
|
||||||
|
# See if the target is a group. If so we set the
|
||||||
|
# targetgroup attr, otherwise we consider it a subtree
|
||||||
|
if api.env.container_group in target:
|
||||||
|
kw['targetgroup'] = target
|
||||||
|
else:
|
||||||
|
kw['subtree'] = target
|
||||||
|
|
||||||
|
groupdn = a.bindrule['expression']
|
||||||
|
groupdn = groupdn.replace('ldap:///','')
|
||||||
|
(dn, entry_attrs) = ldap.get_entry(groupdn, ['cn'])
|
||||||
|
if api.env.container_taskgroup in dn:
|
||||||
|
kw['taskgroup'] = entry_attrs['cn'][0]
|
||||||
|
else:
|
||||||
|
kw['group'] = entry_attrs['cn'][0]
|
||||||
|
|
||||||
|
return kw
|
||||||
|
|
||||||
def _convert_strings_to_acis(acistrs):
|
def _convert_strings_to_acis(acistrs):
|
||||||
acis = []
|
acis = []
|
||||||
for a in acistrs:
|
for a in acistrs:
|
||||||
@ -362,18 +409,23 @@ class aci_mod(crud.Update):
|
|||||||
acis = _convert_strings_to_acis(entry_attrs.get('aci', []))
|
acis = _convert_strings_to_acis(entry_attrs.get('aci', []))
|
||||||
aci = _find_aci_by_name(acis, aciname)
|
aci = _find_aci_by_name(acis, aciname)
|
||||||
|
|
||||||
kw.setdefault('aciname', aci.name)
|
# The strategy here is to convert the ACI we're updating back into
|
||||||
kw.setdefault('taskgroup', aci.bindrule['expression'])
|
# a series of keywords. Then we replace any keywords that have been
|
||||||
kw.setdefault('permissions', aci.permissions)
|
# updated and convert that back into an ACI and write it out.
|
||||||
kw.setdefault('attrs', aci.target['targetattr']['expression'])
|
newkw = _aci_to_kw(ldap, aci)
|
||||||
if 'type' not in kw and 'targetgroup' not in kw and 'subtree' not in kw:
|
for k in kw.keys():
|
||||||
kw['subtree'] = aci.target['target']['expression']
|
newkw[k] = kw[k]
|
||||||
if 'memberof' not in kw and 'filter' not in kw:
|
if 'aciname' in newkw:
|
||||||
kw['filter'] = aci.target['targetfilter']['expression']
|
del newkw['aciname']
|
||||||
|
|
||||||
self.api.Command['aci_del'](aciname)
|
self.api.Command['aci_del'](aciname)
|
||||||
|
|
||||||
return self.api.Command['aci_add'](aciname, **kw)
|
result = self.api.Command['aci_add'](aciname, **newkw)['result']
|
||||||
|
|
||||||
|
return dict(
|
||||||
|
result=result,
|
||||||
|
value=aciname,
|
||||||
|
)
|
||||||
|
|
||||||
def output_for_cli(self, textui, result, aciname, **options):
|
def output_for_cli(self, textui, result, aciname, **options):
|
||||||
"""
|
"""
|
||||||
@ -451,7 +503,7 @@ class aci_find(crud.Search):
|
|||||||
try:
|
try:
|
||||||
self.api.Command['taskgroup_show'](
|
self.api.Command['taskgroup_show'](
|
||||||
kw['taskgroup']
|
kw['taskgroup']
|
||||||
)['result']
|
)
|
||||||
except errors.NotFound:
|
except errors.NotFound:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
# Authors:
|
# Authors:
|
||||||
# Rob Crittenden <rcritten@redhat.com>
|
# Rob Crittenden <rcritten@redhat.com>
|
||||||
#
|
#
|
||||||
# Copyright (C) 2009 Red Hat
|
# Copyright (C) 2010 Red Hat
|
||||||
# see file 'COPYING' for use and warranty information
|
# see file 'COPYING' for use and warranty information
|
||||||
#
|
#
|
||||||
# This program is free software; you can redistribute it and/or
|
# This program is free software; you can redistribute it and/or
|
||||||
@ -16,62 +16,207 @@
|
|||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program; if not, write to the Free Software
|
# along with this program; if not, write to the Free Software
|
||||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Test the `ipalib/plugins/aci.py` module.
|
Test the `ipalib/plugins/aci.py` module.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
from ipalib import api, errors
|
||||||
from xmlrpc_test import XMLRPC_test, assert_attr_equal
|
from tests.test_xmlrpc import objectclasses
|
||||||
from ipalib import api
|
from xmlrpc_test import Declarative, fuzzy_digits, fuzzy_uuid
|
||||||
from ipalib import errors
|
|
||||||
|
|
||||||
|
|
||||||
class test_aci(XMLRPC_test):
|
aci1=u'test1'
|
||||||
"""
|
taskgroup = u'testtaskgroup'
|
||||||
Test the `aci` plugin.
|
|
||||||
"""
|
|
||||||
aciname = u'acitest'
|
|
||||||
taskgroup = u'testtaskgroup'
|
|
||||||
kw = {'permissions': u'add', 'type': u'user', 'taskgroup': taskgroup }
|
|
||||||
aci = u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "acitest";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn)
|
|
||||||
|
|
||||||
def test_1_aci_add(self):
|
|
||||||
"""
|
|
||||||
Test adding an aci using the `xmlrpc.aci_add` method.
|
|
||||||
"""
|
|
||||||
result = api.Command['aci_add'](self.aciname, **self.kw)['result']
|
|
||||||
|
|
||||||
assert result == self.aci
|
class test_aci(Declarative):
|
||||||
|
|
||||||
def test_2_aci_show(self):
|
cleanup_commands = [
|
||||||
"""
|
('aci_del', [aci1], {}),
|
||||||
Test showing an aci using the `xmlrpc.aci_show` method.
|
]
|
||||||
"""
|
|
||||||
result = api.Command['aci_show'](self.aciname)['result']
|
|
||||||
|
|
||||||
assert result == self.aci
|
tests = [
|
||||||
|
|
||||||
def test_3_aci_find(self):
|
dict(
|
||||||
"""
|
desc='Try to retrieve non-existent %r' % aci1,
|
||||||
Test showing an aci using the `xmlrpc.aci_show` method.
|
command=('aci_show', [aci1], {}),
|
||||||
"""
|
expected=errors.NotFound(reason='no such entry'),
|
||||||
outcome = api.Command['aci_find'](self.aciname)
|
),
|
||||||
result = outcome['result']
|
|
||||||
count = outcome['count']
|
|
||||||
|
|
||||||
assert count == 1
|
|
||||||
assert result[0] == self.aci
|
|
||||||
|
|
||||||
def test_4_aci_del(self):
|
dict(
|
||||||
"""
|
desc='Try to update non-existent %r' % aci1,
|
||||||
Remove the second test policy with `xmlrpc.aci_del`.
|
command=('aci_mod', [aci1], dict(permissions=u'write')),
|
||||||
"""
|
expected=errors.NotFound(reason='no such entry'),
|
||||||
assert api.Command['aci_del'](self.aciname)['result'] is True
|
),
|
||||||
|
|
||||||
# Verify that it is gone
|
|
||||||
try:
|
dict(
|
||||||
api.Command['aci_show'](self.aciname)
|
desc='Try to delete non-existent %r' % aci1,
|
||||||
except errors.NotFound:
|
command=('aci_del', [aci1], {}),
|
||||||
pass
|
expected=errors.NotFound(reason='no such entry'),
|
||||||
else:
|
),
|
||||||
assert False
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Create %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_add', [aci1], dict(permissions=u'add', type=u'user', taskgroup=taskgroup)
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=u'Created ACI "test1"',
|
||||||
|
result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Try to create duplicate %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_add', [aci1], dict(permissions=u'add', type=u'user', taskgroup=taskgroup)
|
||||||
|
),
|
||||||
|
expected=errors.DuplicateEntry(),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Retrieve %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_show', [aci1], {}
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=None,
|
||||||
|
result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Search for %r with all=True' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_find', [aci1], {'all': True}
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
result=[
|
||||||
|
u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn)
|
||||||
|
],
|
||||||
|
summary=u'1 ACI matched',
|
||||||
|
count=1,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Search for %r with minimal attributes' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_find', [aci1], {}
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
result=[
|
||||||
|
u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn)
|
||||||
|
],
|
||||||
|
summary=u'1 ACI matched',
|
||||||
|
count=1,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Update permissions in %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_mod', [aci1], dict(permissions=u'add,write')
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=u'Updated ACI "test1"',
|
||||||
|
result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Retrieve %r to verify update' % aci1,
|
||||||
|
command=('aci_show', [aci1], {}),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=None,
|
||||||
|
result=u'(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
|
||||||
|
),
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Update attributes in %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_mod', [aci1], dict(attrs=u'cn, sn,givenName')
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=u'Updated ACI "test1"',
|
||||||
|
result=u'(targetattr = "cn || sn || givenName")(target = "ldap:///uid=*,cn=users,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Update type in %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_mod', [aci1], dict(type=u'group')
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=u'Updated ACI "test1"',
|
||||||
|
result=u'(targetattr = "cn || sn || givenName")(target = "ldap:///cn=*,cn=groups,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Update memberOf in %r' % aci1,
|
||||||
|
command=(
|
||||||
|
'aci_mod', [aci1], dict(memberof=u'ipausers')
|
||||||
|
),
|
||||||
|
expected=dict(
|
||||||
|
value=aci1,
|
||||||
|
summary=u'Updated ACI "test1"',
|
||||||
|
result=u'(targetattr = "cn || sn || givenName")(targetfilter = "(memberOf=cn=testtaskgroup,cn=taskgroups,cn=accounts,%s)")(target = "ldap:///cn=*,cn=groups,cn=accounts,%s")(version 3.0;acl "test1";allow (add,write) groupdn = "ldap:///cn=testtaskgroup,cn=taskgroups,cn=accounts,%s";)' % (api.env.basedn, api.env.basedn, api.env.basedn),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Delete %r' % aci1,
|
||||||
|
command=('aci_del', [aci1], {}),
|
||||||
|
expected=dict(
|
||||||
|
result=True,
|
||||||
|
summary=u'Deleted ACI "test1"',
|
||||||
|
value=aci1,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Try to delete non-existent %r' % aci1,
|
||||||
|
command=('aci_del', [aci1], {}),
|
||||||
|
expected=errors.NotFound(reason='no such entry'),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Try to retrieve non-existent %r' % aci1,
|
||||||
|
command=('aci_show', [aci1], {}),
|
||||||
|
expected=errors.NotFound(reason='no such entry'),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
dict(
|
||||||
|
desc='Try to update non-existent %r' % aci1,
|
||||||
|
command=('aci_mod', [aci1], dict(givenname=u'Foo')),
|
||||||
|
expected=errors.NotFound(reason='no such entry'),
|
||||||
|
),
|
||||||
|
|
||||||
|
|
||||||
|
]
|
||||||
|
Loading…
Reference in New Issue
Block a user