mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Add external source hosts to HBAC.
When adding/removing source hosts if the host isn't found in IPA it is considered external. The attribute externalhost is used to store external hosts. ticket https://fedorahosted.org/freeipa/ticket/1574
This commit is contained in:
@@ -150,6 +150,9 @@ global_output_params = (
|
|||||||
Str('externalhost?',
|
Str('externalhost?',
|
||||||
label=_('External host'),
|
label=_('External host'),
|
||||||
),
|
),
|
||||||
|
Str('sourcehost',
|
||||||
|
label=_('Failed source hosts/hostgroups'),
|
||||||
|
),
|
||||||
Str('memberhost',
|
Str('memberhost',
|
||||||
label=_('Failed hosts/hostgroups'),
|
label=_('Failed hosts/hostgroups'),
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ class hbacrule(LDAPObject):
|
|||||||
'description', 'usercategory', 'hostcategory',
|
'description', 'usercategory', 'hostcategory',
|
||||||
'sourcehostcategory', 'servicecategory', 'ipaenabledflag',
|
'sourcehostcategory', 'servicecategory', 'ipaenabledflag',
|
||||||
'memberuser', 'sourcehost', 'memberhost', 'memberservice',
|
'memberuser', 'sourcehost', 'memberhost', 'memberservice',
|
||||||
'memberhostgroup',
|
'memberhostgroup', 'externalhost',
|
||||||
]
|
]
|
||||||
uuid_attribute = 'ipauniqueid'
|
uuid_attribute = 'ipauniqueid'
|
||||||
rdn_attribute = 'ipauniqueid'
|
rdn_attribute = 'ipauniqueid'
|
||||||
@@ -480,6 +480,34 @@ class hbacrule_add_sourcehost(LDAPAddMember):
|
|||||||
raise errors.MutuallyExclusiveError(reason="source hosts cannot be added when sourcehost category='all'")
|
raise errors.MutuallyExclusiveError(reason="source hosts cannot be added when sourcehost category='all'")
|
||||||
return dn
|
return dn
|
||||||
|
|
||||||
|
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
|
||||||
|
completed_external = 0
|
||||||
|
# Sift through the host failures. We assume that these are all
|
||||||
|
# hosts that aren't stored in IPA, aka external hosts.
|
||||||
|
if 'sourcehost' in failed and 'host' in failed['sourcehost']:
|
||||||
|
(dn, entry_attrs_) = ldap.get_entry(dn, ['externalhost'])
|
||||||
|
members = entry_attrs.get('sourcehost', [])
|
||||||
|
external_hosts = entry_attrs_.get('externalhost', [])
|
||||||
|
failed_hosts = []
|
||||||
|
for host in failed['sourcehost']['host']:
|
||||||
|
hostname = host[0].lower()
|
||||||
|
host_dn = self.api.Object['host'].get_dn(hostname)
|
||||||
|
if hostname in external_hosts:
|
||||||
|
failed_hosts.append((hostname, unicode(errors.AlreadyGroupMember())))
|
||||||
|
elif hostname not in external_hosts and host_dn not in members:
|
||||||
|
external_hosts.append(hostname)
|
||||||
|
completed_external += 1
|
||||||
|
else:
|
||||||
|
failed_hosts.append((hostname, unicode(errors.NotFound())))
|
||||||
|
if completed_external:
|
||||||
|
try:
|
||||||
|
ldap.update_entry(dn, {'externalhost': external_hosts})
|
||||||
|
except errors.EmptyModlist:
|
||||||
|
pass
|
||||||
|
entry_attrs['externalhost'] = external_hosts
|
||||||
|
failed['sourcehost']['host'] = failed_hosts
|
||||||
|
return (completed + completed_external, dn)
|
||||||
|
|
||||||
api.register(hbacrule_add_sourcehost)
|
api.register(hbacrule_add_sourcehost)
|
||||||
|
|
||||||
|
|
||||||
@@ -489,6 +517,31 @@ class hbacrule_remove_sourcehost(LDAPRemoveMember):
|
|||||||
member_attributes = ['sourcehost']
|
member_attributes = ['sourcehost']
|
||||||
member_count_out = ('%i object removed.', '%i objects removed.')
|
member_count_out = ('%i object removed.', '%i objects removed.')
|
||||||
|
|
||||||
|
def post_callback(self, ldap, completed, failed, dn, entry_attrs, *keys, **options):
|
||||||
|
# Run through the host failures and gracefully remove any defined as
|
||||||
|
# as an externalhost.
|
||||||
|
if 'sourcehost' in failed and 'host' in failed['sourcehost']:
|
||||||
|
(dn, entry_attrs_) = ldap.get_entry(dn, ['externalhost'])
|
||||||
|
external_hosts = entry_attrs_.get('externalhost', [])
|
||||||
|
failed_hosts = []
|
||||||
|
completed_external = 0
|
||||||
|
for host in failed['sourcehost']['host']:
|
||||||
|
hostname = host[0].lower()
|
||||||
|
if hostname in external_hosts:
|
||||||
|
external_hosts.remove(hostname)
|
||||||
|
completed_external += 1
|
||||||
|
else:
|
||||||
|
failed_hosts.append(hostname)
|
||||||
|
if completed_external:
|
||||||
|
try:
|
||||||
|
ldap.update_entry(dn, {'externalhost': external_hosts})
|
||||||
|
except errors.EmptyModlist:
|
||||||
|
pass
|
||||||
|
failed['sourcehost']['host'] = failed_hosts
|
||||||
|
entry_attrs['externalhost'] = external_hosts
|
||||||
|
return (completed + completed_external, dn)
|
||||||
|
|
||||||
|
|
||||||
api.register(hbacrule_remove_sourcehost)
|
api.register(hbacrule_remove_sourcehost)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ class test_hbac(XMLRPC_test):
|
|||||||
test_sourcehost = u'hbacrule._test_src_host'
|
test_sourcehost = u'hbacrule._test_src_host'
|
||||||
test_sourcehostgroup = u'hbacrule_test_src_hostgroup'
|
test_sourcehostgroup = u'hbacrule_test_src_hostgroup'
|
||||||
test_service = u'sshd'
|
test_service = u'sshd'
|
||||||
|
test_host_external = u'notfound.example.com'
|
||||||
|
|
||||||
def test_0_hbacrule_add(self):
|
def test_0_hbacrule_add(self):
|
||||||
"""
|
"""
|
||||||
@@ -333,7 +334,73 @@ class test_hbac(XMLRPC_test):
|
|||||||
assert 'sourcehost host' not in entry
|
assert 'sourcehost host' not in entry
|
||||||
assert 'sourcehost hostgroup' not in entry
|
assert 'sourcehost hostgroup' not in entry
|
||||||
|
|
||||||
def test_c_hbacrule_clear_testing_data(self):
|
def test_c_hbacrule_add_external_host(self):
|
||||||
|
"""
|
||||||
|
Test adding an external host using `xmlrpc.hbacrule_add_host`.
|
||||||
|
"""
|
||||||
|
ret = api.Command['hbacrule_add_sourcehost'](
|
||||||
|
self.rule_name, host=self.test_host_external
|
||||||
|
)
|
||||||
|
assert ret['completed'] == 1
|
||||||
|
failed = ret['failed']
|
||||||
|
assert 'sourcehost' in failed
|
||||||
|
assert 'host' in failed['sourcehost']
|
||||||
|
assert not failed['sourcehost']['host']
|
||||||
|
assert 'hostgroup' in failed['sourcehost']
|
||||||
|
assert not failed['sourcehost']['hostgroup']
|
||||||
|
entry = ret['result']
|
||||||
|
assert_attr_equal(entry, 'externalhost', self.test_host_external)
|
||||||
|
|
||||||
|
def test_c_hbacrule_add_same_external(self):
|
||||||
|
"""
|
||||||
|
Test adding the same external host using `xmlrpc.hbacrule_add_host`.
|
||||||
|
"""
|
||||||
|
ret = api.Command['hbacrule_add_sourcehost'](
|
||||||
|
self.rule_name, host=self.test_host_external
|
||||||
|
)
|
||||||
|
assert ret['completed'] == 0
|
||||||
|
failed = ret['failed']
|
||||||
|
assert 'sourcehost' in failed
|
||||||
|
assert 'host' in failed['sourcehost']
|
||||||
|
assert (self.test_host_external, unicode(errors.AlreadyGroupMember())) in failed['sourcehost']['host']
|
||||||
|
entry = ret['result']
|
||||||
|
assert_attr_equal(entry, 'externalhost', self.test_host_external)
|
||||||
|
|
||||||
|
def test_c_hbacrule_remove_external_host(self):
|
||||||
|
"""
|
||||||
|
Test removing external source host using `xmlrpc.hbacrule_remove_host`.
|
||||||
|
"""
|
||||||
|
ret = api.Command['hbacrule_remove_sourcehost'](
|
||||||
|
self.rule_name, host=self.test_host_external
|
||||||
|
)
|
||||||
|
assert ret['completed'] == 1
|
||||||
|
failed = ret['failed']
|
||||||
|
assert 'sourcehost' in failed
|
||||||
|
assert 'host' in failed['sourcehost']
|
||||||
|
assert not failed['sourcehost']['host']
|
||||||
|
assert 'hostgroup' in failed['sourcehost']
|
||||||
|
assert not failed['sourcehost']['hostgroup']
|
||||||
|
entry = ret['result']
|
||||||
|
assert 'sourcehost host' not in entry
|
||||||
|
assert 'sourcehost hostgroup' not in entry
|
||||||
|
|
||||||
|
def test_c_hbacrule_remove_nonexist_external(self):
|
||||||
|
"""
|
||||||
|
Test removing non-existent external source host using `xmlrpc.hbacrule_remove_host`.
|
||||||
|
"""
|
||||||
|
ret = api.Command['hbacrule_remove_sourcehost'](
|
||||||
|
self.rule_name, host=self.test_host_external
|
||||||
|
)
|
||||||
|
assert ret['completed'] == 0
|
||||||
|
failed = ret['failed']
|
||||||
|
assert 'sourcehost' in failed
|
||||||
|
assert 'host' in failed['sourcehost']
|
||||||
|
assert (self.test_host_external, unicode(errors.NotGroupMember())) in failed['sourcehost']['host']
|
||||||
|
assert 'hostgroup' in failed['sourcehost']
|
||||||
|
assert not failed['sourcehost']['hostgroup']
|
||||||
|
entry = ret['result']
|
||||||
|
|
||||||
|
def test_c_hbacrule_zap_testing_data(self):
|
||||||
"""
|
"""
|
||||||
Clear data for HBAC plugin testing.
|
Clear data for HBAC plugin testing.
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user