# Authors: # Rob Crittenden # Pavel Zuna # Petr Viktorin # # Copyright (C) 2008, 2009 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Test the `ipalib.plugins.host` module. """ from __future__ import print_function import os import tempfile import base64 import pytest from ipapython import ipautil from ipalib import api, errors, x509 from ipapython.dn import DN from ipapython.dnsutil import DNSName from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test, fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer, fuzzy_hex, raises_exact) from ipatests.test_xmlrpc.test_user_plugin import get_group_dn from ipatests.test_xmlrpc import objectclasses from ipatests.test_xmlrpc.tracker.host_plugin import HostTracker from ipatests.test_xmlrpc.testcert import get_testcert from ipatests.util import assert_deepequal # Constants DNS integration tests # TODO: Use tracker fixtures for zones/records/users/groups dnszone = u'test-zone.test' dnszone_absolute = dnszone + '.' dnszone_dn = DN(('idnsname', dnszone_absolute), api.env.container_dns, api.env.basedn) dnszone_rname = u'root.%s' % dnszone_absolute dnszone_rname_dnsname = DNSName(dnszone_rname) revzone = u'29.16.172.in-addr.arpa.' revzone_dn = DN(('idnsname', revzone), api.env.container_dns, api.env.basedn) revipv6zone = u'0.0.0.0.1.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.' revipv6zone_dn = DN(('idnsname', revipv6zone), api.env.container_dns, api.env.basedn) arec = u'172.16.29.22' aaaarec = u'2001:db8:1::beef' arec2 = u'172.16.29.33' aaaarec2 = u'2001:db8:1::dead' ipv4_fromip = u'testipv4fromip' ipv4_fromip_ip = u'172.16.29.40' ipv4_fromip_arec = ipv4_fromip_ip ipv4_fromip_dnsname = DNSName(ipv4_fromip) ipv4_fromip_dn = DN(('idnsname', ipv4_fromip), dnszone_dn) ipv4_fromip_host_fqdn = u'%s.%s' % (ipv4_fromip, dnszone) ipv4_fromip_ptr = u'40' ipv4_fromip_ptr_dnsname = DNSName(ipv4_fromip_ptr) ipv4_fromip_ptr_dn = DN(('idnsname', ipv4_fromip_ptr), revzone_dn) ipv6_fromip = u'testipv6fromip' ipv6_fromip_ipv6 = u'2001:db8:1::9' ipv6_fromip_aaaarec = ipv6_fromip_ipv6 ipv6_fromip_dnsname = DNSName(ipv6_fromip) ipv6_fromip_dn = DN(('idnsname', ipv6_fromip), dnszone_dn) ipv6_fromip_ptr = u'9.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0' ipv6_fromip_ptr_dnsname = DNSName(ipv6_fromip_ptr) ipv6_fromip_ptr_dn = DN(('idnsname', ipv6_fromip_ptr), revipv6zone_dn) sshpubkey = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L public key test' sshpubkeyfp = u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B public key test (ssh-rsa)' user1 = u'tuser1' user2 = u'tuser2' group1 = u'group1' group1_dn = get_group_dn(group1) group2 = u'group2' group2_dn = get_group_dn(group2) hostgroup1 = u'testhostgroup1' hostgroup1_dn = DN(('cn',hostgroup1),('cn','hostgroups'),('cn','accounts'), api.env.basedn) host_cert = get_testcert(DN(('CN', api.env.host), x509.subject_base()), 'host/%s@%s' % (api.env.host, api.env.realm)) @pytest.fixture(scope='class') def host(request): tracker = HostTracker(name=u'testhost1') return tracker.make_fixture(request) @pytest.fixture(scope='class') def host2(request): tracker = HostTracker(name=u'testhost2') return tracker.make_fixture(request) @pytest.fixture(scope='class') def host3(request): tracker = HostTracker(name=u'testhost3') return tracker.make_fixture(request) @pytest.fixture(scope='class') def lab_host(request): name = u'testhost1' tracker = HostTracker(name=name, fqdn=u'%s.lab.%s' % (name, api.env.domain)) return tracker.make_fixture(request) @pytest.fixture(scope='class') def this_host(request): """Fixture for the current master""" tracker = HostTracker(name=api.env.host.partition('.')[0], fqdn=api.env.host) tracker.exists = True # Finalizer ensures that any certificates added to this_host are removed tracker.add_finalizer_certcleanup(request) # This host is not created/deleted, so don't call make_fixture return tracker @pytest.fixture(scope='class') def invalid_host(request): tracker = HostTracker(name='foo_bar',) return tracker.make_fixture(request) @pytest.fixture(scope='class') def ipv6only_host(request): name = u'testipv6onlyhost' tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) return tracker.make_fixture(request) @pytest.fixture(scope='class') def ipv4only_host(request): name = u'testipv4onlyhost' tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) return tracker.make_fixture(request) @pytest.fixture(scope='class') def ipv46both_host(request): name = u'testipv4and6host' tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) return tracker.make_fixture(request) @pytest.fixture(scope='class') def ipv4_fromip_host(request): name = u'testipv4fromip' tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) return tracker.make_fixture(request) @pytest.fixture(scope='class') def ipv6_fromip_host(request): name = u'testipv6fromip' tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone)) return tracker.make_fixture(request) @pytest.mark.tier1 class TestNonexistentHost(XMLRPC_test): def test_retrieve_nonexistent(self, host): host.ensure_missing() command = host.make_retrieve_command() with raises_exact(errors.NotFound( reason=u'%s: host not found' % host.fqdn)): command() def test_update_nonexistent(self, host): host.ensure_missing() command = host.make_update_command(updates=dict(description=u'Nope')) with raises_exact(errors.NotFound( reason=u'%s: host not found' % host.fqdn)): command() def test_delete_nonexistent(self, host): host.ensure_missing() command = host.make_delete_command() with raises_exact(errors.NotFound( reason=u'%s: host not found' % host.fqdn)): command() @pytest.mark.tier1 class TestCRUD(XMLRPC_test): def test_create_duplicate(self, host): host.ensure_exists() command = host.make_create_command(force=True) with raises_exact(errors.DuplicateEntry( message=u'host with name "%s" already exists' % host.fqdn)): command() def test_retrieve_simple(self, host): host.retrieve() def test_retrieve_all(self, host): host.retrieve(all=True) def test_search_simple(self, host): host.find() def test_search_all(self, host): host.find(all=True) def test_update_simple(self, host): host.update(dict( description=u'Updated host 1', usercertificate=host_cert), expected_updates=dict( description=[u'Updated host 1'], usercertificate=[base64.b64decode(host_cert)], issuer=fuzzy_issuer, md5_fingerprint=fuzzy_hash, serial_number=fuzzy_digits, serial_number_hex=fuzzy_hex, sha1_fingerprint=fuzzy_hash, subject=DN(('CN', api.env.host), x509.subject_base()), valid_not_before=fuzzy_date, valid_not_after=fuzzy_date, )) host.retrieve() def test_try_rename(self, host): host.ensure_exists() command = host.make_update_command( updates=dict(setattr=u'fqdn=changed.example.com')) with raises_exact(errors.NotAllowedOnRDN()): command() def test_add_mac_address(self, host): host.update(dict(macaddress=u'00:50:56:30:F6:5F'), expected_updates=dict(macaddress=[u'00:50:56:30:F6:5F'])) host.retrieve() def test_add_mac_addresses(self, host): host.update(dict(macaddress=[u'00:50:56:30:F6:5F', u'00:50:56:2C:8D:82'])) host.retrieve() def test_try_illegal_mac(self, host): command = host.make_update_command( updates=dict(macaddress=[u'xx'])) with raises_exact(errors.ValidationError( name='macaddress', error=u'Must be of the form HH:HH:HH:HH:HH:HH, where ' + u'each H is a hexadecimal character.')): command() def test_add_ssh_pubkey(self, host): host.update(dict(ipasshpubkey=[sshpubkey]), expected_updates=dict( ipasshpubkey=[sshpubkey], sshpubkeyfp=[sshpubkeyfp], )) host.retrieve() def test_try_illegal_ssh_pubkey(self, host): host.ensure_exists() command = host.make_update_command( updates=dict(ipasshpubkey=[u'no-pty %s' % sshpubkey])) with raises_exact(errors.ValidationError( name='sshpubkey', error=u'options are not allowed')): command() def test_delete_host(self, host): host.delete() def test_retrieve_nonexistent(self, host): host.ensure_missing() command = host.make_retrieve_command() with raises_exact(errors.NotFound( reason=u'%s: host not found' % host.fqdn)): command() def test_update_nonexistent(self, host): host.ensure_missing() command = host.make_update_command( updates=dict(description=u'Nope')) with raises_exact(errors.NotFound( reason=u'%s: host not found' % host.fqdn)): command() def test_delete_nonexistent(self, host): host.ensure_missing() command = host.make_delete_command() with raises_exact(errors.NotFound( reason=u'%s: host not found' % host.fqdn)): command() def test_try_add_not_in_dns(self, host): host.ensure_missing() command = host.make_create_command(force=False) with raises_exact(errors.DNSNotARecordError( reason=u'Host does not have corresponding DNS A/AAAA record')): command() def test_add_host_with_null_password(self, host): host.ensure_missing() command = host.make_create_command() result = command(userpassword=None) host.track_create() host.check_create(result) @pytest.mark.tier1 class TestMultipleMatches(XMLRPC_test): def test_try_show_multiple_matches_with_shortname(self, host, lab_host): host.ensure_exists() lab_host.ensure_exists() assert host.shortname == lab_host.shortname command = host.make_command('host_show', host.shortname) with pytest.raises(errors.SingleMatchExpected): command() @pytest.mark.tier1 class TestHostWithService(XMLRPC_test): """Test deletion using a non-fully-qualified hostname. Services associated with this host should also be removed. """ # TODO: Use a service tracker, when available def test_host_with_service(self, host): host.ensure_exists() service1 = u'dns/%s@%s' % (host.fqdn, host.api.env.realm) service1dn = DN(('krbprincipalname', service1.lower()), ('cn','services'), ('cn','accounts'), host.api.env.basedn) try: result = host.run_command('service_add', service1, force=True) assert_deepequal(dict( value=service1, summary=u'Added service "%s"' % service1, result=dict( dn=service1dn, krbprincipalname=[service1], objectclass=objectclasses.service, managedby_host=[host.fqdn], ipauniqueid=[fuzzy_uuid], ), ), result) host.delete() result = host.run_command('service_find', host.fqdn) assert_deepequal(dict( count=0, truncated=False, summary=u'0 services matched', result=[], ), result) finally: try: host.run_command('service_del', service1) except errors.NotFound: pass @pytest.mark.tier1 class TestManagedHosts(XMLRPC_test): def test_managed_hosts(self, host, host2, host3): host.ensure_exists() host2.ensure_exists() host3.ensure_exists() self.add_managed_host(host, host2) host2.retrieve() self.search_man_noman_hosts(host2, host) self.search_man_hosts(host2, host3) self.remove_man_hosts(host, host2) host.retrieve() host2.retrieve() def add_managed_host(self, manager, underling): command = manager.make_command('host_add_managedby', underling.fqdn, host=manager.fqdn) result = command() underling.attrs['managedby_host'] = [manager.fqdn, underling.fqdn] assert_deepequal(dict( completed=1, failed={'managedby': {'host': ()}}, result=underling.filter_attrs(underling.managedby_keys), ), result) def search_man_noman_hosts(self, host, noman_host): command = host.make_find_command(host.fqdn, man_host=host.fqdn, not_man_host=noman_host.fqdn) result = command() assert_deepequal(dict( count=1, truncated=False, summary=u'1 host matched', result=[host.filter_attrs(host.find_keys)], ), result) def search_man_hosts(self, host1, host2): command = host1.make_find_command(man_host=[host1.fqdn, host2.fqdn]) result = command() assert_deepequal(dict( count=0, truncated=False, summary=u'0 hosts matched', result=[], ), result) def remove_man_hosts(self, manager, underling): command = manager.make_command('host_remove_managedby', underling.fqdn, host=manager.fqdn) result = command() underling.attrs['managedby_host'] = [underling.fqdn] assert_deepequal(dict( completed=1, failed={'managedby': {'host': ()}}, result=underling.filter_attrs(underling.managedby_keys), ), result) @pytest.mark.tier1 class TestProtectedMaster(XMLRPC_test): def test_try_delete_master(self, this_host): command = this_host.make_delete_command() with raises_exact(errors.ValidationError( name='hostname', error=u'An IPA master host cannot be deleted or disabled')): command() def test_try_disable_master(self, this_host): command = this_host.make_command('host_disable', this_host.fqdn) with raises_exact(errors.ValidationError( name='hostname', error=u'An IPA master host cannot be deleted or disabled')): command() @pytest.mark.tier1 class TestValidation(XMLRPC_test): def test_try_validate_create(self, invalid_host): command = invalid_host.make_create_command() with raises_exact(errors.ValidationError( name='hostname', error=u"invalid domain-name: only letters, numbers, '-' are " + u"allowed. DNS label may not start or end with '-'")): command() # The assumption on these next 4 tests is that if we don't get a # validation error then the request was processed normally. def test_try_validate_update(self, invalid_host): command = invalid_host.make_update_command({}) with raises_exact(errors.NotFound( reason=u'%s: host not found' % invalid_host.fqdn)): command() def test_try_validate_delete(self, invalid_host): command = invalid_host.make_delete_command() with raises_exact(errors.NotFound( reason=u'%s: host not found' % invalid_host.fqdn)): command() def test_try_validate_retrieve(self, invalid_host): command = invalid_host.make_retrieve_command() with raises_exact(errors.NotFound( reason=u'%s: host not found' % invalid_host.fqdn)): command() def test_try_validate_find(self, invalid_host): command = invalid_host.make_find_command(invalid_host.fqdn) result = command() assert_deepequal(dict( count=0, truncated=False, summary=u'0 hosts matched', result=[], ), result) @pytest.yield_fixture def keytabname(request): keytabfd, keytabname = tempfile.mkstemp() try: os.close(keytabfd) yield keytabname finally: os.unlink(keytabname) @pytest.mark.tier1 class TestHostFalsePwdChange(XMLRPC_test): def test_join_host(self, host, keytabname): """ Create a test host and join it into IPA. """ join_command = 'ipa-client/ipa-join' if not os.path.isfile(join_command): pytest.skip("Command '%s' not found" % join_command) # create a test host with bulk enrollment password host.track_create() del host.attrs['krbprincipalname'] host.attrs['has_password'] = True objclass = list(set( host.attrs['objectclass']) - {u'krbprincipal', u'krbprincipalaux'}) host.attrs['objectclass'] = objclass command = host.make_create_command(force=True) result = command(random=True) random_pass = result['result']['randompassword'] host.attrs['randompassword'] = random_pass host.check_create(result) del host.attrs['randompassword'] # joint the host with the bulk password new_args = [ join_command, "-s", host.api.env.host, "-h", host.fqdn, "-k", keytabname, "-w", random_pass, "-q", ] try: ipautil.run(new_args) except ipautil.CalledProcessError as e: # join operation may fail on 'adding key into keytab', but # the keytab is not necessary for further tests print(e) host.attrs['has_keytab'] = True host.attrs['has_password'] = False host.attrs['krbprincipalname'] = [u'host/%s@%s' % (host.fqdn, host.api.env.realm)] host.retrieve() # Try to change the password of enrolled host with specified password command = host.make_update_command( updates=dict(userpassword=u'pass_123')) with pytest.raises(errors.ValidationError): command() # Try to change the password of enrolled host with random password command = host.make_update_command(updates=dict(random=True)) with pytest.raises(errors.ValidationError): command() @pytest.yield_fixture(scope='class') def dns_setup(host): try: host.run_command('dnszone_del', dnszone, revzone, revipv6zone, **{'continue': True}) except (errors.NotFound, errors.EmptyModlist): pass try: host.run_command('dnszone_add', dnszone, idnssoarname=dnszone_rname) host.run_command('dnszone_add', revzone, idnssoarname=dnszone_rname) host.run_command('dnszone_add', revipv6zone, idnssoarname=dnszone_rname) yield finally: try: host.run_command('dnszone_del', dnszone, revzone, revipv6zone, **{'continue': True}) except (errors.NotFound, errors.EmptyModlist): pass @pytest.mark.tier1 class TestHostDNS(XMLRPC_test): def test_add_ipv6only_host(self, dns_setup, ipv6only_host): ipv6only_host.run_command('dnsrecord_add', dnszone, ipv6only_host.shortname, aaaarecord=aaaarec) try: ipv6only_host.create(force=False) finally: command = ipv6only_host.run_command('dnsrecord_del', dnszone, ipv6only_host.shortname, aaaarecord=aaaarec) def test_add_ipv4only_host(self, dns_setup, ipv4only_host): ipv4only_host.run_command('dnsrecord_add', dnszone, ipv4only_host.shortname, arecord=arec) try: ipv4only_host.create(force=False) finally: command = ipv4only_host.run_command('dnsrecord_del', dnszone, ipv4only_host.shortname, arecord=arec) def test_add_ipv46both_host(self, dns_setup, ipv46both_host): ipv46both_host.run_command('dnsrecord_add', dnszone, ipv46both_host.shortname, arecord=arec2, aaaarecord=aaaarec2) try: ipv46both_host.create(force=False) finally: command = ipv46both_host.run_command('dnsrecord_del', dnszone, ipv46both_host.shortname, arecord=arec2, aaaarecord=aaaarec2) def test_add_ipv4_host_from_ip(self, dns_setup, ipv4_fromip_host): ipv4_fromip_host.ensure_missing() ipv4_fromip_host.track_create() command = ipv4_fromip_host.make_create_command(force=False) result = command(ip_address=ipv4_fromip_ip) ipv4_fromip_host.check_create(result) result = ipv4_fromip_host.run_command('dnsrecord_show', dnszone, ipv4_fromip_host.shortname) assert_deepequal(dict( value=ipv4_fromip_dnsname, summary=None, result=dict( dn=ipv4_fromip_dn, idnsname=[ipv4_fromip_dnsname], arecord=[ipv4_fromip_arec], ), ), result) result = ipv4_fromip_host.run_command('dnsrecord_show', revzone, ipv4_fromip_ptr) assert_deepequal(dict( value=ipv4_fromip_ptr_dnsname, summary=None, result=dict( dn=ipv4_fromip_ptr_dn, idnsname=[ipv4_fromip_ptr_dnsname], ptrrecord=[ipv4_fromip_host.fqdn + '.'], ), ), result) def test_add_ipv6_host_from_ip(self, dns_setup, ipv6_fromip_host): ipv6_fromip_host.ensure_missing() ipv6_fromip_host.track_create() command = ipv6_fromip_host.make_create_command(force=False) result = command(ip_address=ipv6_fromip_ipv6) ipv6_fromip_host.check_create(result) result = ipv6_fromip_host.run_command('dnsrecord_show', dnszone, ipv6_fromip_host.shortname) assert_deepequal(dict( value=ipv6_fromip_dnsname, summary=None, result=dict( dn=ipv6_fromip_dn, idnsname=[ipv6_fromip_dnsname], aaaarecord=[ipv6_fromip_aaaarec], ), ), result) result = ipv6_fromip_host.run_command('dnsrecord_show', revipv6zone, ipv6_fromip_ptr) assert_deepequal(dict( value=ipv6_fromip_ptr_dnsname, summary=None, result=dict( dn=ipv6_fromip_ptr_dn, idnsname=[ipv6_fromip_ptr_dnsname], ptrrecord=[ipv6_fromip_host.fqdn + '.'], ), ), result) @pytest.fixture(scope='class') def allowedto_context(request, host3): def cleanup(): try: host3.run_command('user_del', user1, user2, **{'continue': True}) except errors.NotFound: pass try: host3.run_command('group_del', group1, group2, **{'continue': True}) except errors.NotFound: pass try: host3.run_command('hostgroup_del', hostgroup1) except errors.NotFound: pass cleanup() request.addfinalizer(cleanup) host3.ensure_exists() host3.run_command('user_add', givenname=u'Test', sn=u'User1') host3.run_command('user_add', givenname=u'Test', sn=u'User2') host3.run_command('group_add', group1) host3.run_command('group_add', group2) host3.run_command('hostgroup_add', hostgroup1, description=u'Test hostgroup 1') @pytest.mark.tier1 class TestHostAllowedTo(XMLRPC_test): def test_user_allow_retrieve_keytab(self, allowedto_context, host): host.ensure_exists() result = host.run_command('host_allow_retrieve_keytab', host.fqdn, user=user1) host.attrs['ipaallowedtoperform_read_keys_user'] = [user1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=1, result=host.filter_attrs(host.allowedto_keys), ), result) # Duplicates should not be accepted result = host.run_command('host_allow_retrieve_keytab', host.fqdn, user=user1) assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[[user1, u'This entry is already a member']], ), ), completed=0, result=host.filter_attrs(host.allowedto_keys), ), result) def test_group_allow_retrieve_keytab(self, allowedto_context, host, host3): host.ensure_exists() host3.ensure_exists() result = host.run_command('host_allow_retrieve_keytab', host.fqdn, group=[group1, group2], host=[host3.fqdn], hostgroup=[hostgroup1]) host.attrs['ipaallowedtoperform_read_keys_group'] = [group1, group2] host.attrs['ipaallowedtoperform_read_keys_host'] = [host3.fqdn] host.attrs['ipaallowedtoperform_read_keys_hostgroup'] = [hostgroup1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=4, result=host.filter_attrs(host.allowedto_keys), ), result) # Non-members cannot be removed result = host.run_command('host_disallow_retrieve_keytab', host.fqdn, user=[user2]) assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[[user2, u'This entry is not a member']], ), ), completed=0, result=host.filter_attrs(host.allowedto_keys), ), result) # Disallow one of the existing allowed groups result = host.run_command('host_disallow_retrieve_keytab', host.fqdn, group=[group2]) host.attrs['ipaallowedtoperform_read_keys_group'] = [group1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_read_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=1, result=host.filter_attrs(host.allowedto_keys), ), result) host.retrieve() def test_allow_create(self, allowedto_context, host, host3): host.ensure_exists() host3.ensure_exists() result = host.run_command('host_allow_create_keytab', host.fqdn, group=[group1, group2], user=[user1], host=[host3.fqdn], hostgroup=[hostgroup1]) host.attrs['ipaallowedtoperform_write_keys_user'] = [user1] host.attrs['ipaallowedtoperform_write_keys_group'] = [group1, group2] host.attrs['ipaallowedtoperform_write_keys_host'] = [host3.fqdn] host.attrs['ipaallowedtoperform_write_keys_hostgroup'] = [hostgroup1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( group=[], host=[], hostgroup=[], user=[]), ), completed=5, result=host.filter_attrs(host.allowedto_keys), ), result) # Duplicates should not be accepted result = host.run_command('host_allow_create_keytab', host.fqdn, group=[group1], user=[user1], host=[host3.fqdn], hostgroup=[hostgroup1]) assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( group=[[group1, u'This entry is already a member']], host=[[host3.fqdn, u'This entry is already a member']], user=[[user1, u'This entry is already a member']], hostgroup=[[hostgroup1, u'This entry is already a member']], ), ), completed=0, result=host.filter_attrs(host.allowedto_keys), ), result) # Non-mambers cannot be removed result = host.run_command('host_disallow_create_keytab', host.fqdn, user=[user2]) assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( group=[], host=[], hostgroup=[], user=[[user2, u'This entry is not a member']], ), ), completed=0, result=host.filter_attrs(host.allowedto_keys), ), result) # Disallow one of the existing allowed groups result = host.run_command('host_disallow_create_keytab', host.fqdn, group=[group2]) host.attrs['ipaallowedtoperform_write_keys_group'] = [group1] assert_deepequal(dict( failed=dict( ipaallowedtoperform_write_keys=dict( group=[], host=[], hostgroup=[], user=[], ), ), completed=1, result=host.filter_attrs(host.allowedto_keys), ), result) host.retrieve() def test_host_mod(self, host): # Done (usually) at the end to ensure the tracking works well host.update(updates=dict(description=u'desc'), expected_updates=dict(description=[u'desc']))