diff --git a/install/updates/taskgroup.update b/install/updates/taskgroup.update index 858da3cfe..a98960657 100644 --- a/install/updates/taskgroup.update +++ b/install/updates/taskgroup.update @@ -1,5 +1,5 @@ # Add the taskgroup container -dn: cn=taskgroups,$SUFFIX +dn: cn=taskgroups,cn=accounts,$SUFFIX add:objectClass: nsContainer add:cn: taskgroups diff --git a/ipalib/plugins/taskgroup.py b/ipalib/plugins/taskgroup.py index ebd7f11ee..863c0a393 100644 --- a/ipalib/plugins/taskgroup.py +++ b/ipalib/plugins/taskgroup.py @@ -21,208 +21,122 @@ Frontend plugins for taskgroups. """ -from ipalib import api, crud, errors, errors2 -from ipalib import Object, Command # Plugin base classes -from ipalib import Str, Int, Flag # Parameter types +from ipalib import api +from ipalib.plugins.basegroup import * -default_attributes = ['cn','description'] -container_taskgroup = "cn=taskgroups" +display_attributes = ['cn','description', 'member', 'memberof'] +container_taskgroup = "cn=taskgroups,cn=accounts" +container_rolegroup = "cn=rolegroups,cn=accounts" -def make_taskgroup_dn(cn): - """ - Construct group dn from cn. - """ - import ldap - return 'cn=%s,%s,%s' % ( - ldap.dn.escape_dn_chars(cn), - container_taskgroup, - api.env.basedn, - ) - -class taskgroup(Object): +class taskgroup(BaseGroup): """ taskgroup object. """ - takes_params = ( - Str('description', - doc='A description of this group', - attribute=True, - ), - Str('cn', - cli_name='name', - primary_key=True, - normalizer=lambda value: value.lower(), - attribute=True, - ), - ) + container=container_taskgroup + api.register(taskgroup) -class taskgroup_add(crud.Add): - 'Add a new group.' - - def execute(self, cn, **kw): - """ - Execute the taskgroup-add operation. - - The dn should not be passed as a keyword argument as it is constructed - by this method. - - Returns the entry as it will be created in LDAP. - - No need to explicitly set gidNumber. The dna_plugin will do this - for us if the value isn't provided by the caller. - - :param cn: The name of the group being added. - :param kw: Keyword arguments for the other LDAP attributes. - """ - assert 'cn' not in kw - assert 'dn' not in kw - ldap = self.api.Backend.ldap - entry = self.args_options_2_entry(cn, **kw) - entry['dn'] = make_taskgroup_dn(cn) - - # some required objectclasses - entry['objectClass'] = ['top','groupofnames'] - - return ldap.create(**entry) - - def output_for_cli(self, textui, result, *args, **options): - """ - Output result of this command to command line interface. - """ - textui.print_name(self.name) - textui.print_entry(result) - textui.print_dashed('Added group "%s"' % result['cn']) +class taskgroup_add(basegroup_add): + 'Add a new taskgroup.' api.register(taskgroup_add) -class taskgroup_del(crud.Del): - 'Delete an existing group.' - def execute(self, cn, **kw): - """ - Delete a group - - :param cn: The name of the group being removed - :param kw: Unused - """ - ldap = self.api.Backend.ldap - dn = ldap.find_entry_dn("cn", cn, "groupofnames", container_taskgroup) - self.log.info("IPA: taskgroup-del '%s'" % dn) - - return ldap.delete(dn) - - def output_for_cli(self, textui, result, cn): - """ - Output result of this command to command line interface. - """ - textui.print_plain("Deleted group %s" % cn) +class taskgroup_del(basegroup_del): + 'Delete an existing taskgroup.' + container = container_taskgroup api.register(taskgroup_del) -class taskgroup_mod(crud.Mod): - 'Edit an existing group.' - def execute(self, cn, **kw): - """ - Execute the taskgroup-mod operation. - - The dn should not be passed as a keyword argument as it is constructed - by this method. - - Returns the entry - - :param cn: The name of the group to update. - :param kw: Keyword arguments for the other LDAP attributes. - """ - assert 'cn' not in kw - assert 'dn' not in kw - ldap = self.api.Backend.ldap - dn = ldap.find_entry_dn("cn", cn, "groupofnames", container_taskgroup) - - return ldap.update(dn, **kw) - - def output_for_cli(self, textui, result, cn, **options): - """ - Output result of this command to command line interface. - """ - if result: - textui.print_plain("Group updated") +class taskgroup_mod(basegroup_mod): + 'Edit an existing taskgroup.' + container = container_taskgroup api.register(taskgroup_mod) -class taskgroup_find(crud.Find): +class taskgroup_find(basegroup_find): 'Search the groups.' - def execute(self, term, **kw): - ldap = self.api.Backend.ldap - - # Pull the list of searchable attributes out of the configuration. - config = ldap.get_ipa_config() - search_fields_conf_str = config.get('ipagroupsearchfields') - search_fields = search_fields_conf_str.split(",") - - search_kw = {} - for s in search_fields: - search_kw[s] = term - - object_type = ldap.get_object_type("cn") - if object_type and not kw.get('objectclass'): - search_kw['objectclass'] = object_type - search_kw['base'] = container_taskgroup - search_kw['objectclass'] = "groupofnames" - return ldap.search(**search_kw) - - def output_for_cli(self, textui, result, uid, **options): - counter = result[0] - groups = result[1:] - if counter == 0 or len(groups) == 0: - textui.print_plain("No entries found") - return - if len(groups) == 1: - textui.print_entry(groups[0]) - return - textui.print_name(self.name) - - for g in groups: - textui.print_entry(g) - textui.print_plain('') - if counter == -1: - textui.print_plain("These results are truncated.") - textui.print_plain("Please refine your search and try again.") - textui.print_count(groups, '%d groups matched') + container = container_taskgroup api.register(taskgroup_find) -class taskgroup_show(crud.Get): - 'Examine an existing group.' - takes_options = ( - Flag('all', doc='Retrieve all attributes'), - ) - def execute(self, cn, **kw): - """ - Execute the taskgroup-show operation. - - The dn should not be passed as a keyword argument as it is constructed - by this method. - - Returns the entry - - :param cn: The group name to retrieve. - :param kw: Not used. - """ - ldap = self.api.Backend.ldap - dn = ldap.find_entry_dn("cn", cn, "groupofnames", container_taskgroup) - - # FIXME: should kw contain the list of attributes to display? - if kw.get('all', False): - return ldap.retrieve(dn) - else: - return ldap.retrieve(dn, default_attributes) - - def output_for_cli(self, textui, result, *args, **options): - textui.print_entry(result) +class taskgroup_show(basegroup_show): + 'Examine an existing taskgroup.' + default_attributes = display_attributes + container = container_taskgroup api.register(taskgroup_show) + + +class taskgroup_add_member(basegroup_add_member): + 'Add a member to a taskgroup.' + container = container_taskgroup + takes_options = basegroup_add_member.takes_options + (List('rolegroups?', doc='comma-separated list of role groups to add'),) + + def execute(self, cn, **kw): + """ + Execute the group-add-member operation. + + Returns the updated group entry + + :param cn: The group name to add new members to. + :param kw: groups is a comma-separated list of groups to add + :param kw: users is a comma-separated list of users to add + :param kw: rolegroups is a comma-separated list of rolegroups to add + """ + assert self.container + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container) + add_failed = [] + to_add = [] + completed = 0 + + # Do the base class additions first + add_failed = super(taskgroup_add_member, self).execute(cn, **kw) + + members = kw.get('rolegroups', []) + (to_add, add_failed) = self._find_members(ldap, add_failed, members, "cn", self.filter_class, container_rolegroup) + (completed, add_failed) = self._add_members(ldap, completed, to_add, add_failed, dn, "member") + + return add_failed + +api.register(taskgroup_add_member) + + +class taskgroup_remove_member(basegroup_remove_member): + 'Remove a member from a taskgroup.' + container = container_taskgroup + takes_options = basegroup_remove_member.takes_options + (List('rolegroups?', doc='comma-separated list of role groups to remove'),) + + def execute(self, cn, **kw): + """ + Execute the group-remove-member operation. + + Returns the updated group entry + + :param cn: The group name to remove new members from. + :param kw: groups is a comma-separated list of groups to remove + :param kw: users is a comma-separated list of users to remove + :param kw: rolegroups is a comma-separated list of rolegroups to remove + """ + assert self.container + ldap = self.api.Backend.ldap + dn = ldap.find_entry_dn("cn", cn, self.filter_class, self.container) + remove_failed = [] + to_remove = [] + completed = 0 + + # Do the base class removals first + remove_failed = super(taskgroup_remove_member, self).execute(cn, **kw) + + members = kw.get('rolegroups', []) + (to_remove, remove_failed) = self._find_members(ldap, remove_failed, members, "cn", self.filter_class, container_rolegroup) + (completed, remove_failed) = self._remove_members(ldap, completed, to_remove, remove_failed, dn, "member") + + return remove_failed + +api.register(taskgroup_remove_member) diff --git a/tests/test_xmlrpc/test_taskgroup_plugin.py b/tests/test_xmlrpc/test_taskgroup_plugin.py new file mode 100644 index 000000000..ee4a5bbac --- /dev/null +++ b/tests/test_xmlrpc/test_taskgroup_plugin.py @@ -0,0 +1,188 @@ +# Authors: +# Rob Crittenden +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Test the `ipalib/plugins/taskgroup` module. +""" + +import sys +from xmlrpc_test import XMLRPC_test +from ipalib import api +from ipalib import errors2 + + +class test_Taskgroup(XMLRPC_test): + """ + Test the `taskgroup` plugin. + """ + cn=u'testgroup' + description=u'Test task group' + kw={'cn': cn, 'description': description} + + taskgroup_cn = u'ipatestgroup' + taskgroup_description = u'Test group for taskgroups' + + rolegroup_cn = u'iparolegroup' + rolegroup_description = u'Test rolegroup for taskgroups' + + def test_add(self): + """ + Test the `xmlrpc.taskgroup_add` method. + """ + res = api.Command['taskgroup_add'](**self.kw) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + + def test_addrolegroup(self): + """ + Add a rolegroup to test add/remove member. + """ + kw={'cn': self.rolegroup_cn, 'description': self.rolegroup_description} + res = api.Command['rolegroup_add'](**kw) + assert res + assert res.get('description','') == self.rolegroup_description + assert res.get('cn','') == self.rolegroup_cn + + def test_addtaskgroup(self): + """ + Add a group to test add/remove member. + """ + kw={'cn': self.taskgroup_cn, 'description': self.taskgroup_description} + res = api.Command['group_add'](**kw) + assert res + assert res.get('description','') == self.taskgroup_description + assert res.get('cn','') == self.taskgroup_cn + + def test_addtaskgroupmember(self): + """ + Test the `xmlrpc.taskgroup_add_member` method. + """ + kw={} + kw['groups'] = self.taskgroup_cn + kw['rolegroups'] = self.rolegroup_cn + res = api.Command['taskgroup_add_member'](self.cn, **kw) + assert res == tuple() + + def test_doshow(self): + """ + Test the `xmlrpc.taskgroup_show` method. + """ + res = api.Command['taskgroup_show'](self.cn) + assert res + assert res.get('description','') == self.description + assert res.get('cn','') == self.cn + foundrole = False + foundtask = False + members = res.get('member',[]) + for m in members: + if m.startswith('cn=%s' % self.taskgroup_cn): foundtask=True + if m.startswith('cn=%s' % self.rolegroup_cn): foundrole=True + + if not foundtask and foundrole: + assert False + + def test_find(self): + """ + Test the `xmlrpc.taskgroup_find` method. + """ + res = api.Command['taskgroup_find'](self.cn) + assert res + assert len(res) == 2, res + assert res[1].get('description','') == self.description + assert res[1].get('cn','') == self.cn + members = res[1].get('member',[]) + foundrole = False + foundtask = False + for m in members: + if m.startswith('cn=%s' % self.taskgroup_cn): foundtask=True + if m.startswith('cn=%s' % self.rolegroup_cn): foundrole=True + + if not foundtask and foundrole: + assert False + + def test_mod(self): + """ + Test the `xmlrpc.taskgroup_mod` method. + """ + newdesc=u'Updated task group' + modkw={'cn': self.cn, 'description': newdesc} + res = api.Command['taskgroup_mod'](**modkw) + assert res + assert res.get('description','') == newdesc + + # Ok, double-check that it was changed + res = api.Command['taskgroup_show'](self.cn) + assert res + assert res.get('description','') == newdesc + assert res.get('cn','') == self.cn + + def test_member_remove(self): + """ + Test the `xmlrpc.taskgroup_remove_member` method. + """ + kw={} + kw['tasks'] = self.taskgroup_cn + res = api.Command['taskgroup_remove_member'](self.cn, **kw) + assert res == tuple() + + def test_remove(self): + """ + Test the `xmlrpc.taskgroup_del` method. + """ + res = api.Command['taskgroup_del'](self.cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['taskgroup_show'](self.cn) + except errors2.NotFound: + pass + else: + assert False + + def test_removetask(self): + """ + Remove the group we created for member testing + """ + res = api.Command['group_del'](self.taskgroup_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['group_show'](self.taskgroup_cn) + except errors2.NotFound: + pass + else: + assert False + + def test_removerolegroup(self): + """ + Remove the rolegroup we created for member testing + """ + res = api.Command['rolegroup_del'](self.rolegroup_cn) + assert res == True + + # Verify that it is gone + try: + res = api.Command['rolegroup_show'](self.rolegroup_cn) + except errors2.NotFound: + pass + else: + assert False