SUDO plugin support for external hosts and users https://fedorahosted.org/freeipa/ticket/570

This commit is contained in:
Jr Aquino
2010-12-17 08:29:33 -08:00
committed by Rob Crittenden
parent dd9615d189
commit fc8f7f9da8
2 changed files with 269 additions and 6 deletions

View File

@@ -25,6 +25,7 @@ from ipalib import Str, StrEnum
from ipalib.plugins.baseldap import *
from ipalib import _, ngettext
class sudorule(LDAPObject):
"""
Sudo Rule.
@@ -96,7 +97,7 @@ class sudorule(LDAPObject):
doc=_('Run As Group category the rule applies to'),
values=(u'all', ),
),
Str('memberuser_user?',
Str('memberuser_user?',
label=_('Users'),
flags=['no_create', 'no_update', 'no_search'],
),
@@ -124,15 +125,19 @@ class sudorule(LDAPObject):
label=_('Sudo Command Groups'),
flags=['no_create', 'no_update', 'no_search'],
),
Str('ipasudorunas_user?',
Str('ipasudorunas_user?',
label=_('Run As User'),
flags=['no_create', 'no_update', 'no_search'],
),
Str('ipasudorunasgroup_group?',
Str('ipasudorunasgroup_group?',
label=_('Run As Group'),
flags=['no_create', 'no_update', 'no_search'],
),
Str('externaluser?',
cli_name='externaluser',
label=_('External User'),
doc=_('External User the rule applies to'),
),
)
api.register(sudorule)
@@ -283,6 +288,32 @@ class sudorule_add_user(LDAPAddMember):
member_attributes = ['memberuser']
member_count_out = ('%i object added.', '%i objects added.')
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
completed_external = 0
# Sift through the user failures. We assume that these are all
# users that aren't stored in IPA, aka external users.
if 'memberuser' in failed and 'user' in failed['memberuser']:
(dn, entry_attrs_) = ldap.get_entry(dn, ['externaluser'])
members = entry_attrs.get('memberuser', [])
external_users = entry_attrs_.get('externaluser', [])
failed_users = []
for user in failed['memberuser']['user']:
username = user[0].lower()
user_dn = self.api.Object['user'].get_dn(username)
if username not in external_users and user_dn not in members:
external_users.append(username)
completed_external += 1
else:
failed_users.append(username)
if completed_external:
try:
ldap.update_entry(dn, {'externaluser': external_users})
except errors.EmptyModlist:
pass
failed['memberuser']['user'] = failed_users
entry_attrs['externaluser'] = external_users
return (completed + completed_external, dn)
api.register(sudorule_add_user)
@@ -293,6 +324,30 @@ class sudorule_remove_user(LDAPRemoveMember):
member_attributes = ['memberuser']
member_count_out = ('%i object removed.', '%i objects removed.')
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
# Run through the user failures and gracefully remove any defined as
# as an externaluser.
if 'memberuser' in failed and 'user' in failed['memberuser']:
(dn, entry_attrs) = ldap.get_entry(dn, ['externaluser'])
external_users = entry_attrs.get('externaluser', [])
failed_users = []
completed_external = 0
for user in failed['memberuser']['user']:
username = user[0].lower()
if username in external_users:
external_users.remove(username)
completed_external += 1
else:
failed_users.append(username)
if completed_external:
try:
ldap.update_entry(dn, {'externaluser': external_users})
except errors.EmptyModlist:
pass
failed['memberuser']['user'] = failed_users
entry_attrs['externaluser'] = external_users
return (completed + completed_external, dn)
api.register(sudorule_remove_user)
@@ -303,6 +358,32 @@ class sudorule_add_host(LDAPAddMember):
member_attributes = ['memberhost']
member_count_out = ('%i object added.', '%i objects added.')
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
completed_external = 0
# Sift through the host failures. We assume that these are all
# hosts that aren't stored in IPA, aka external hosts.
if 'memberhost' in failed and 'host' in failed['memberhost']:
(dn, entry_attrs_) = ldap.get_entry(dn, ['externalhost'])
members = entry_attrs.get('memberhost', [])
external_hosts = entry_attrs_.get('externalhost', [])
failed_hosts = []
for host in failed['memberhost']['host']:
hostname = host[0].lower()
host_dn = self.api.Object['host'].get_dn(hostname)
if hostname not in external_hosts and host_dn not in members:
external_hosts.append(hostname)
completed_external += 1
else:
failed_hosts.append(hostname)
if completed_external:
try:
ldap.update_entry(dn, {'externalhost': external_hosts})
except errors.EmptyModlist:
pass
failed['memberhost']['host'] = failed_hosts
entry_attrs['externalhost'] = external_hosts
return (completed + completed_external, dn)
api.register(sudorule_add_host)
@@ -313,6 +394,30 @@ class sudorule_remove_host(LDAPRemoveMember):
member_attributes = ['memberhost']
member_count_out = ('%i object removed.', '%i objects removed.')
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
# Run through the host failures and gracefully remove any defined as
# as an externalhost.
if 'memberhost' in failed and 'host' in failed['memberhost']:
(dn, entry_attrs) = ldap.get_entry(dn, ['externalhost'])
external_hosts = entry_attrs.get('externalhost', [])
failed_hosts = []
completed_external = 0
for host in failed['memberhost']['host']:
hostname = host[0].lower()
if hostname in external_hosts:
external_hosts.remove(hostname)
completed_external += 1
else:
failed_hosts.append(hostname)
if completed_external:
try:
ldap.update_entry(dn, {'externalhost': external_hosts})
except errors.EmptyModlist:
pass
failed['memberhost']['host'] = failed_hosts
entry_attrs['externalhost'] = external_hosts
return (completed + completed_external, dn)
api.register(sudorule_remove_host)
@@ -354,3 +459,85 @@ class sudorule_remove_runasgroup(LDAPRemoveMember):
member_count_out = ('%i object removed.', '%i objects removed.')
api.register(sudorule_remove_runasgroup)
class sudorule_add_option(LDAPQuery):
"""
Add an option to the Sudo rule.
"""
takes_options = (
Str('ipasudoopt',
cli_name='sudooption',
label=_('Sudo Option'),
),
)
def execute(self, cn, **options):
ldap = self.obj.backend
dn = self.obj.get_dn(cn)
(dn, entry_attrs) = ldap.get_entry(dn, ['ipasudoopt'])
entry_attrs.setdefault('ipasudoopt', []).append(
options['ipasudoopt']
)
try:
ldap.update_entry(dn, entry_attrs)
except errors.EmptyModlist:
pass
except errors.NotFound:
self.obj.handle_not_found(cn)
return dict(result=entry_attrs)
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
textui.print_dashed(
'Added option "%s" to Sudo rule "%s"' % (
options['ipasudoopt'], cn
)
)
api.register(sudorule_add_option)
class sudorule_remove_option(LDAPQuery):
"""
Remove an option from Sudo rule.
"""
takes_options = (
Str('ipasudoopt?',
cli_name='sudooption',
label=_('Sudo Option'),
),
)
def execute(self, cn, **options):
ldap = self.obj.backend
dn = self.obj.get_dn(cn)
(dn, entry_attrs) = ldap.get_entry(dn, ['ipasudoopt'])
try:
entry_attrs.setdefault('ipasudoopt', []).remove(
options['ipasudoopt']
)
ldap.update_entry(dn, entry_attrs)
except (ValueError, errors.EmptyModlist):
pass
except errors.NotFound:
self.obj.handle_not_found(cn)
return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
textui.print_dashed(
'Removed option "%s" from Sudo rule "%s"' % (
options['ipasudoopt'], cn
)
)
api.register(sudorule_remove_option)

View File

@@ -36,8 +36,10 @@ class test_sudorule(XMLRPC_test):
rule_desc_mod = u'description modified'
test_user = u'sudorule_test_user'
test_external_user = u'external_test_user'
test_group = u'sudorule_test_group'
test_host = u'sudorule._test_host'
test_external_host = u'external._test_host'
test_hostgroup = u'sudorule_test_hostgroup'
test_sudoallowcmdgroup = u'sudorule_test_allowcmdgroup'
test_sudodenycmdgroup = u'sudorule_test_denycmdgroup'
@@ -46,6 +48,7 @@ class test_sudorule(XMLRPC_test):
test_runasuser = u'manager'
test_runasgroup = u'manager'
test_catagory = u'all'
test_option = u'authenticate'
def test_0_sudorule_add(self):
"""
@@ -210,14 +213,14 @@ class test_sudorule(XMLRPC_test):
ret = api.Command['sudorule_add_runasgroup'](
self.rule_name, group=self.test_runasgroup
)
print ret
assert ret['completed'] == 1
failed = ret['failed']
assert 'ipasudorunasgroup' in failed
assert 'group' in failed['ipasudorunasgroup']
assert not failed['ipasudorunasgroup']['group']
entry = ret['result']
assert_attr_equal(entry, 'ipasudorunasgroup_group', self.test_runasgroup)
assert_attr_equal(entry, 'ipasudorunasgroup_group',
self.test_runasgroup)
def test_b_sudorule_remove_runasgroup(self):
"""
@@ -235,6 +238,53 @@ class test_sudorule(XMLRPC_test):
entry = ret['result']
assert 'ipasudorunasgroup_group' not in entry
def test_a_sudorule_add_externaluser(self):
"""
Test adding an external user to Sudo rule using
`xmlrpc.sudorule_add_user`.
"""
ret = api.Command['sudorule_add_user'](
self.rule_name, user=self.test_external_user
)
assert ret['completed'] == 1
failed = ret['failed']
entry = ret['result']
assert_attr_equal(entry, 'externaluser', self.test_external_user)
def test_b_sudorule_remove_externaluser(self):
"""
Test removing an external user from Sudo rule using
`xmlrpc.sudorule_remove_user'.
"""
ret = api.Command['sudorule_remove_user'](
self.rule_name, user=self.test_external_user
)
assert ret['completed'] == 1
failed = ret['failed']
entry = ret['result']
assert 'externaluser' not in entry
def test_a_sudorule_add_option(self):
"""
Test adding an option to Sudo rule using
`xmlrpc.sudorule_add_option`.
"""
ret = api.Command['sudorule_add_option'](
self.rule_name, ipasudoopt=self.test_option
)
entry = ret['result']
assert_attr_equal(entry, 'ipasudoopt', self.test_option)
def test_b_sudorule_remove_option(self):
"""
Test removing an option from Sudo rule using
`xmlrpc.sudorule_remove_option'.
"""
ret = api.Command['sudorule_remove_option'](
self.rule_name, ipasudoopt=self.test_option
)
assert ret['result'] is True
def test_a_sudorule_add_host(self):
"""
Test adding host and hostgroup to Sudo rule using
@@ -273,6 +323,32 @@ class test_sudorule(XMLRPC_test):
assert 'memberhost_host' not in entry
assert 'memberhost_hostgroup' not in entry
def test_a_sudorule_add_externalhost(self):
"""
Test adding an external host to Sudo rule using
`xmlrpc.sudorule_add_host`.
"""
ret = api.Command['sudorule_add_host'](
self.rule_name, host=self.test_external_host
)
assert ret['completed'] == 1
failed = ret['failed']
entry = ret['result']
assert_attr_equal(entry, 'externalhost', self.test_external_host)
def test_b_sudorule_remove_externalhost(self):
"""
Test removing an external host from Sudo rule using
`xmlrpc.sudorule_remove_host`.
"""
ret = api.Command['sudorule_remove_host'](
self.rule_name, host=self.test_external_host
)
assert ret['completed'] == 1
failed = ret['failed']
entry = ret['result']
assert 'externalhost' not in entry
def test_a_sudorule_add_allow_command(self):
"""
Test adding allow command and cmdgroup to Sudo rule using