freeipa/ipatests/test_xmlrpc/test_host_plugin.py
Florence Blanc-Renaud 4295df17a4 ipa host-add: do not raise exception when reverse record not added
When ipa host-add --random is unable to add a reverse record (for instance
because the server does not manage any reverse zone), the command
adds the host but exits (return code=1) with an error without actually
outputing the random password generated.
With this fix, the behavior is modified. The commands succeeds (return code=0)
but prints a warning.

This commit also adds a unit test.

https://pagure.io/freeipa/issue/7374

Reviewed-By: Christian Heimes <cheimes@redhat.com>
2018-02-23 14:39:34 +01:00

998 lines
36 KiB
Python

# Authors:
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
# Petr Viktorin <pviktori@redhat.com>
#
# Copyright (C) 2008, 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Test the `ipalib.plugins.host` module.
"""
from __future__ import print_function
import os
import tempfile
import base64
import pytest
from ipapython import ipautil
from ipalib import api, errors, messages
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
from ipatests.test_util import yield_fixture
from ipatests.test_xmlrpc.xmlrpc_test import (XMLRPC_test,
fuzzy_uuid, fuzzy_digits, fuzzy_hash, fuzzy_date, fuzzy_issuer,
fuzzy_hex, raises_exact)
from ipatests.test_xmlrpc.test_user_plugin import get_group_dn
from ipatests.test_xmlrpc import objectclasses
from ipatests.test_xmlrpc.tracker.host_plugin import HostTracker
from ipatests.test_xmlrpc.testcert import get_testcert, subject_base
from ipatests.util import assert_deepequal
from ipaplatform.paths import paths
# Constants DNS integration tests
# TODO: Use tracker fixtures for zones/records/users/groups
dnszone = u'test-zone.test'
dnszone_absolute = dnszone + '.'
dnszone_dn = DN(('idnsname', dnszone_absolute), api.env.container_dns, api.env.basedn)
dnszone_rname = u'root.%s' % dnszone_absolute
dnszone_rname_dnsname = DNSName(dnszone_rname)
revzone = u'29.16.172.in-addr.arpa.'
revzone_dn = DN(('idnsname', revzone), api.env.container_dns, api.env.basedn)
revipv6zone = u'0.0.0.0.1.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.'
revipv6zone_dn = DN(('idnsname', revipv6zone), api.env.container_dns, api.env.basedn)
arec = u'172.16.29.22'
aaaarec = u'2001:db8:1::beef'
arec2 = u'172.16.29.33'
aaaarec2 = u'2001:db8:1::dead'
ipv4_fromip = u'testipv4fromip'
ipv4_fromip_ip = u'172.16.29.40'
ipv4_fromip_arec = ipv4_fromip_ip
ipv4_fromip_dnsname = DNSName(ipv4_fromip)
ipv4_fromip_dn = DN(('idnsname', ipv4_fromip), dnszone_dn)
ipv4_fromip_host_fqdn = u'%s.%s' % (ipv4_fromip, dnszone)
ipv4_fromip_ptr = u'40'
ipv4_fromip_ptr_dnsname = DNSName(ipv4_fromip_ptr)
ipv4_fromip_ptr_dn = DN(('idnsname', ipv4_fromip_ptr), revzone_dn)
ipv6_fromip = u'testipv6fromip'
ipv6_fromip_ipv6 = u'2001:db8:1::9'
ipv6_fromip_aaaarec = ipv6_fromip_ipv6
ipv6_fromip_dnsname = DNSName(ipv6_fromip)
ipv6_fromip_dn = DN(('idnsname', ipv6_fromip), dnszone_dn)
ipv6_fromip_ptr = u'9.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0'
ipv6_fromip_ptr_dnsname = DNSName(ipv6_fromip_ptr)
ipv6_fromip_ptr_dn = DN(('idnsname', ipv6_fromip_ptr), revipv6zone_dn)
sshpubkey = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L public key test'
sshpubkeyfp = u'SHA256:cStA9o5TRSARbeketEOooMUMSWRSsArIAXloBZ4vNsE public key test (ssh-rsa)'
user1 = u'tuser1'
user2 = u'tuser2'
group1 = u'group1'
group1_dn = get_group_dn(group1)
group2 = u'group2'
group2_dn = get_group_dn(group2)
hostgroup1 = u'testhostgroup1'
hostgroup1_dn = DN(('cn',hostgroup1),('cn','hostgroups'),('cn','accounts'),
api.env.basedn)
host_cert = get_testcert(DN(('CN', api.env.host), subject_base()),
'host/%s@%s' % (api.env.host, api.env.realm))
missingrevzone = u'22.30.16.172.in-addr.arpa.'
ipv4_in_missingrevzone_ip = u'172.16.30.22'
@pytest.fixture(scope='class')
def host(request):
tracker = HostTracker(name=u'testhost1')
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def host2(request):
tracker = HostTracker(name=u'testhost2')
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def host3(request):
tracker = HostTracker(name=u'testhost3')
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def host4(request):
tracker = HostTracker(name=u'testhost4')
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def lab_host(request):
name = u'testhost1'
tracker = HostTracker(name=name,
fqdn=u'%s.lab.%s' % (name, api.env.domain))
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def this_host(request):
"""Fixture for the current master"""
tracker = HostTracker(name=api.env.host.partition('.')[0],
fqdn=api.env.host)
tracker.exists = True
# Finalizer ensures that any certificates added to this_host are removed
tracker.add_finalizer_certcleanup(request)
# This host is not created/deleted, so don't call make_fixture
return tracker
@pytest.fixture(scope='class')
def invalid_host(request):
tracker = HostTracker(name='foo_bar',)
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def ipv6only_host(request):
name = u'testipv6onlyhost'
tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone))
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def ipv4only_host(request):
name = u'testipv4onlyhost'
tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone))
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def ipv46both_host(request):
name = u'testipv4and6host'
tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone))
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def ipv4_fromip_host(request):
name = u'testipv4fromip'
tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone))
return tracker.make_fixture(request)
@pytest.fixture(scope='class')
def ipv6_fromip_host(request):
name = u'testipv6fromip'
tracker = HostTracker(name=name, fqdn=u'%s.%s' % (name, dnszone))
return tracker.make_fixture(request)
@pytest.mark.tier1
class TestNonexistentHost(XMLRPC_test):
def test_retrieve_nonexistent(self, host):
host.ensure_missing()
command = host.make_retrieve_command()
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % host.fqdn)):
command()
def test_update_nonexistent(self, host):
host.ensure_missing()
command = host.make_update_command(updates=dict(description=u'Nope'))
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % host.fqdn)):
command()
def test_delete_nonexistent(self, host):
host.ensure_missing()
command = host.make_delete_command()
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % host.fqdn)):
command()
@pytest.mark.tier1
class TestCRUD(XMLRPC_test):
def test_create_duplicate(self, host):
host.ensure_exists()
command = host.make_create_command(force=True)
with raises_exact(errors.DuplicateEntry(
message=u'host with name "%s" already exists' % host.fqdn)):
command()
def test_retrieve_simple(self, host):
host.retrieve()
def test_retrieve_all(self, host):
host.retrieve(all=True)
def test_search_simple(self, host):
host.find()
def test_search_all(self, host):
host.find(all=True)
def test_update_simple(self, host):
host.update(dict(
description=u'Updated host 1',
usercertificate=host_cert),
expected_updates=dict(
description=[u'Updated host 1'],
usercertificate=[base64.b64decode(host_cert)],
issuer=fuzzy_issuer,
serial_number=fuzzy_digits,
serial_number_hex=fuzzy_hex,
sha1_fingerprint=fuzzy_hash,
sha256_fingerprint=fuzzy_hash,
subject=DN(('CN', api.env.host), subject_base()),
valid_not_before=fuzzy_date,
valid_not_after=fuzzy_date,
))
host.retrieve()
def test_try_rename(self, host):
host.ensure_exists()
command = host.make_update_command(
updates=dict(setattr=u'fqdn=changed.example.com'))
with raises_exact(errors.NotAllowedOnRDN()):
command()
def test_add_mac_address(self, host):
host.update(dict(macaddress=u'00:50:56:30:F6:5F'),
expected_updates=dict(macaddress=[u'00:50:56:30:F6:5F']))
host.retrieve()
def test_add_mac_addresses(self, host):
host.update(dict(macaddress=[u'00:50:56:30:F6:5F',
u'00:50:56:2C:8D:82']))
host.retrieve()
def test_try_illegal_mac(self, host):
command = host.make_update_command(
updates=dict(macaddress=[u'xx']))
with raises_exact(errors.ValidationError(
name='macaddress',
error=u'Must be of the form HH:HH:HH:HH:HH:HH, where ' +
u'each H is a hexadecimal character.')):
command()
def test_add_ssh_pubkey(self, host):
host.update(dict(ipasshpubkey=[sshpubkey]),
expected_updates=dict(
ipasshpubkey=[sshpubkey],
sshpubkeyfp=[sshpubkeyfp],
))
host.retrieve()
def test_try_illegal_ssh_pubkey(self, host):
host.ensure_exists()
command = host.make_update_command(
updates=dict(ipasshpubkey=[u'no-pty %s' % sshpubkey]))
with raises_exact(errors.ValidationError(
name='sshpubkey', error=u'options are not allowed')):
command()
def test_delete_host(self, host):
host.delete()
def test_retrieve_nonexistent(self, host):
host.ensure_missing()
command = host.make_retrieve_command()
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % host.fqdn)):
command()
def test_update_nonexistent(self, host):
host.ensure_missing()
command = host.make_update_command(
updates=dict(description=u'Nope'))
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % host.fqdn)):
command()
def test_delete_nonexistent(self, host):
host.ensure_missing()
command = host.make_delete_command()
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % host.fqdn)):
command()
def test_try_add_not_in_dns(self, host):
host.ensure_missing()
command = host.make_create_command(force=False)
with raises_exact(errors.DNSNotARecordError(hostname=host.fqdn)):
command()
def test_add_host_with_null_password(self, host):
host.ensure_missing()
command = host.make_create_command()
result = command(userpassword=None)
host.track_create()
host.check_create(result)
@pytest.mark.tier1
class TestMultipleMatches(XMLRPC_test):
def test_try_show_multiple_matches_with_shortname(self, host, lab_host):
host.ensure_exists()
lab_host.ensure_exists()
assert host.shortname == lab_host.shortname
command = host.make_command('host_show', host.shortname)
with pytest.raises(errors.SingleMatchExpected):
command()
@pytest.mark.tier1
class TestHostWithService(XMLRPC_test):
"""Test deletion using a non-fully-qualified hostname.
Services associated with this host should also be removed.
"""
# TODO: Use a service tracker, when available
def test_host_with_service(self, host):
host.ensure_exists()
service1 = u'dns/%s@%s' % (host.fqdn, host.api.env.realm)
service1dn = DN(('krbprincipalname', service1.lower()),
('cn','services'), ('cn','accounts'),
host.api.env.basedn)
try:
result = host.run_command('service_add', service1, force=True)
assert_deepequal(dict(
value=service1,
summary=u'Added service "%s"' % service1,
result=dict(
dn=service1dn,
krbprincipalname=[service1],
krbcanonicalname=[service1],
objectclass=objectclasses.service,
managedby_host=[host.fqdn],
ipauniqueid=[fuzzy_uuid],
),
), result)
host.delete()
result = host.run_command('service_find', host.fqdn)
assert_deepequal(dict(
count=0,
truncated=False,
summary=u'0 services matched',
result=[],
), result)
finally:
try:
host.run_command('service_del', service1)
except errors.NotFound:
pass
@pytest.mark.tier1
class TestManagedHosts(XMLRPC_test):
def test_managed_hosts(self, host, host2, host3):
host.ensure_exists()
host2.ensure_exists()
host3.ensure_exists()
self.add_managed_host(host, host2)
host2.retrieve()
self.search_man_noman_hosts(host2, host)
self.search_man_hosts(host2, host3)
self.remove_man_hosts(host, host2)
host.retrieve()
host2.retrieve()
def add_managed_host(self, manager, underling):
command = manager.make_command('host_add_managedby',
underling.fqdn, host=manager.fqdn)
result = command()
underling.attrs['managedby_host'] = [manager.fqdn, underling.fqdn]
assert_deepequal(dict(
completed=1,
failed={'managedby': {'host': ()}},
result=underling.filter_attrs(underling.managedby_keys),
), result)
def search_man_noman_hosts(self, host, noman_host):
command = host.make_find_command(host.fqdn,
man_host=host.fqdn,
not_man_host=noman_host.fqdn)
result = command()
assert_deepequal(dict(
count=1,
truncated=False,
summary=u'1 host matched',
result=[host.filter_attrs(host.find_keys)],
), result)
def search_man_hosts(self, host1, host2):
command = host1.make_find_command(man_host=[host1.fqdn, host2.fqdn])
result = command()
assert_deepequal(dict(
count=0,
truncated=False,
summary=u'0 hosts matched',
result=[],
), result)
def remove_man_hosts(self, manager, underling):
command = manager.make_command('host_remove_managedby',
underling.fqdn, host=manager.fqdn)
result = command()
underling.attrs['managedby_host'] = [underling.fqdn]
assert_deepequal(dict(
completed=1,
failed={'managedby': {'host': ()}},
result=underling.filter_attrs(underling.managedby_keys),
), result)
@pytest.mark.tier1
class TestProtectedMaster(XMLRPC_test):
def test_try_delete_master(self, this_host):
command = this_host.make_delete_command()
with raises_exact(errors.ValidationError(
name='hostname',
error=u'An IPA master host cannot be deleted or disabled')):
command()
def test_try_disable_master(self, this_host):
command = this_host.make_command('host_disable', this_host.fqdn)
with raises_exact(errors.ValidationError(
name='hostname',
error=u'An IPA master host cannot be deleted or disabled')):
command()
@pytest.mark.tier1
class TestValidation(XMLRPC_test):
def test_try_validate_create(self, invalid_host):
command = invalid_host.make_create_command()
with raises_exact(errors.ValidationError(
name='hostname',
error=u"invalid domain-name: only letters, numbers, '-' are " +
u"allowed. DNS label may not start or end with '-'")):
command()
# The assumption on these next 4 tests is that if we don't get a
# validation error then the request was processed normally.
def test_try_validate_update(self, invalid_host):
command = invalid_host.make_update_command({})
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % invalid_host.fqdn)):
command()
def test_try_validate_delete(self, invalid_host):
command = invalid_host.make_delete_command()
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % invalid_host.fqdn)):
command()
def test_try_validate_retrieve(self, invalid_host):
command = invalid_host.make_retrieve_command()
with raises_exact(errors.NotFound(
reason=u'%s: host not found' % invalid_host.fqdn)):
command()
def test_try_validate_find(self, invalid_host):
command = invalid_host.make_find_command(invalid_host.fqdn)
result = command()
assert_deepequal(dict(
count=0,
truncated=False,
summary=u'0 hosts matched',
result=[],
), result)
@yield_fixture
def keytabname(request):
keytabfd, keytabname = tempfile.mkstemp()
try:
os.close(keytabfd)
yield keytabname
finally:
os.unlink(keytabname)
@pytest.mark.tier1
class TestHostFalsePwdChange(XMLRPC_test):
def test_join_host(self, host, keytabname):
"""
Create a test host and join it into IPA.
This test must not run remotely.
"""
if not os.path.isfile(paths.SBIN_IPA_JOIN):
pytest.skip("Command '%s' not found. "
"The test must not run remotely."
% paths.SBIN_IPA_JOIN)
# create a test host with bulk enrollment password
host.track_create()
# manipulate host.attrs to correspond with real attributes of host
# after creating it with random password
del host.attrs['krbprincipalname']
del host.attrs['krbcanonicalname']
host.attrs['has_password'] = True
objclass = list(set(
host.attrs['objectclass']) - {u'krbprincipal', u'krbprincipalaux'})
host.attrs['objectclass'] = objclass
command = host.make_create_command(force=True)
result = command(random=True)
random_pass = result['result']['randompassword']
host.attrs['randompassword'] = random_pass
host.check_create(result)
del host.attrs['randompassword']
# joint the host with the bulk password
new_args = [
paths.SBIN_IPA_JOIN,
"-s", host.api.env.host,
"-h", host.fqdn,
"-k", keytabname,
"-w", random_pass,
"-q",
]
try:
ipautil.run(new_args)
except ipautil.CalledProcessError as e:
# join operation may fail on 'adding key into keytab', but
# the keytab is not necessary for further tests
print(e)
# fix host.attrs again to correspond with current state
host.attrs['has_keytab'] = True
host.attrs['has_password'] = False
host.attrs['krbprincipalname'] = [u'host/%s@%s' % (host.fqdn,
host.api.env.realm)]
host.attrs['krbcanonicalname'] = [u'host/%s@%s' % (host.fqdn,
host.api.env.realm)]
host.retrieve()
# Try to change the password of enrolled host with specified password
command = host.make_update_command(
updates=dict(userpassword=u'pass_123'))
with pytest.raises(errors.ValidationError):
command()
# Try to change the password of enrolled host with random password
command = host.make_update_command(updates=dict(random=True))
with pytest.raises(errors.ValidationError):
command()
@yield_fixture(scope='class')
def dns_setup_nonameserver(host4):
# Make sure that the server does not handle the reverse zone used
# for the test
try:
host4.run_command('dnszone_del', missingrevzone, **{'continue': True})
except (errors.NotFound, errors.EmptyModlist):
pass
# Save the current forward policy
result = host4.run_command('dnsserver_show', api.env.host)
current_fwd_pol = result['result']['idnsforwardpolicy'][0]
# Configure the forward policy to none to make sure that no DNS
# server will answer for the reverse zone either
try:
host4.run_command('dnsserver_mod', api.env.host,
idnsforwardpolicy=u'none')
except errors.EmptyModlist:
pass
try:
yield
finally:
# Restore the previous forward-policy
try:
host4.run_command('dnsserver_mod', api.env.host,
idnsforwardpolicy=current_fwd_pol)
except errors.EmptyModlist:
pass
@pytest.mark.tier1
class TestHostNoNameserversForRevZone(XMLRPC_test):
def test_create_host_with_ip(self, dns_setup_nonameserver, host4):
"""
Regression test for ticket 7397
Configure the master with forward-policy = none to make sure
that no DNS server will answer for the reverse zone
Try to add a new host with an IP address in the missing reverse
zone.
With issue 7397, a NoNameserver exception generates a Traceback in
httpd error_log, and the command returns an InternalError.
"""
try:
command = host4.make_create_command()
result = command(ip_address=ipv4_in_missingrevzone_ip)
msg = result['messages'][0]
assert msg['code'] == messages.FailedToAddHostDNSRecords.errno
expected_msg = ("The host was added but the DNS update failed "
"with: All nameservers failed to answer the query "
"for DNS reverse zone {}").format(missingrevzone)
assert msg['message'] == expected_msg
# Make sure the host is added
host4.run_command('host_show', host4.fqdn)
finally:
# Delete the host entry
command = host4.make_delete_command()
try:
command(updatedns=True)
except errors.NotFound:
pass
def test_create_host_with_otp(self, dns_setup_nonameserver, host4):
"""
Create a test host specifying an IP address for which
IPA does not handle the reverse zone, and requesting
the creation of a random password.
Non-reg test for ticket 7374.
"""
command = host4.make_create_command()
try:
result = command(random=True, ip_address=ipv4_in_missingrevzone_ip)
# Make sure a random password is returned
assert result['result']['randompassword']
# Make sure the warning about missing DNS record is added
msg = result['messages'][0]
assert msg['code'] == messages.FailedToAddHostDNSRecords.errno
assert msg['message'].startswith(
u'The host was added but the DNS update failed with:')
finally:
# Cleanup
try:
command = host4.make_delete_command()
command(updatedns=True)
except errors.NotFound:
pass
@yield_fixture(scope='class')
def dns_setup(host):
try:
host.run_command('dnszone_del', dnszone, revzone, revipv6zone,
**{'continue': True})
except (errors.NotFound, errors.EmptyModlist):
pass
try:
host.run_command('dnszone_add', dnszone, idnssoarname=dnszone_rname)
host.run_command('dnszone_add', revzone, idnssoarname=dnszone_rname)
host.run_command('dnszone_add', revipv6zone,
idnssoarname=dnszone_rname)
yield
finally:
try:
host.run_command('dnszone_del', dnszone, revzone, revipv6zone,
**{'continue': True})
except (errors.NotFound, errors.EmptyModlist):
pass
@pytest.mark.tier1
class TestHostDNS(XMLRPC_test):
def test_add_ipv6only_host(self, dns_setup, ipv6only_host):
ipv6only_host.run_command('dnsrecord_add', dnszone,
ipv6only_host.shortname, aaaarecord=aaaarec)
try:
ipv6only_host.create(force=False)
finally:
ipv6only_host.run_command(
'dnsrecord_del', dnszone, ipv6only_host.shortname,
aaaarecord=aaaarec)
def test_add_ipv4only_host(self, dns_setup, ipv4only_host):
ipv4only_host.run_command('dnsrecord_add', dnszone,
ipv4only_host.shortname, arecord=arec)
try:
ipv4only_host.create(force=False)
finally:
ipv4only_host.run_command(
'dnsrecord_del', dnszone, ipv4only_host.shortname,
arecord=arec)
def test_add_ipv46both_host(self, dns_setup, ipv46both_host):
ipv46both_host.run_command('dnsrecord_add', dnszone,
ipv46both_host.shortname,
arecord=arec2, aaaarecord=aaaarec2)
try:
ipv46both_host.create(force=False)
finally:
ipv46both_host.run_command(
'dnsrecord_del', dnszone, ipv46both_host.shortname,
arecord=arec2, aaaarecord=aaaarec2)
def test_add_ipv4_host_from_ip(self, dns_setup, ipv4_fromip_host):
ipv4_fromip_host.ensure_missing()
ipv4_fromip_host.track_create()
command = ipv4_fromip_host.make_create_command(force=False)
result = command(ip_address=ipv4_fromip_ip)
ipv4_fromip_host.check_create(result)
result = ipv4_fromip_host.run_command('dnsrecord_show', dnszone,
ipv4_fromip_host.shortname)
assert_deepequal(dict(
value=ipv4_fromip_dnsname,
summary=None,
result=dict(
dn=ipv4_fromip_dn,
idnsname=[ipv4_fromip_dnsname],
arecord=[ipv4_fromip_arec],
),
), result)
result = ipv4_fromip_host.run_command('dnsrecord_show', revzone,
ipv4_fromip_ptr)
assert_deepequal(dict(
value=ipv4_fromip_ptr_dnsname,
summary=None,
result=dict(
dn=ipv4_fromip_ptr_dn,
idnsname=[ipv4_fromip_ptr_dnsname],
ptrrecord=[ipv4_fromip_host.fqdn + '.'],
),
), result)
def test_add_ipv6_host_from_ip(self, dns_setup, ipv6_fromip_host):
ipv6_fromip_host.ensure_missing()
ipv6_fromip_host.track_create()
command = ipv6_fromip_host.make_create_command(force=False)
result = command(ip_address=ipv6_fromip_ipv6)
ipv6_fromip_host.check_create(result)
result = ipv6_fromip_host.run_command('dnsrecord_show', dnszone,
ipv6_fromip_host.shortname)
assert_deepequal(dict(
value=ipv6_fromip_dnsname,
summary=None,
result=dict(
dn=ipv6_fromip_dn,
idnsname=[ipv6_fromip_dnsname],
aaaarecord=[ipv6_fromip_aaaarec],
),
), result)
result = ipv6_fromip_host.run_command('dnsrecord_show', revipv6zone,
ipv6_fromip_ptr)
assert_deepequal(dict(
value=ipv6_fromip_ptr_dnsname,
summary=None,
result=dict(
dn=ipv6_fromip_ptr_dn,
idnsname=[ipv6_fromip_ptr_dnsname],
ptrrecord=[ipv6_fromip_host.fqdn + '.'],
),
), result)
@pytest.fixture(scope='class')
def allowedto_context(request, host3):
def cleanup():
try:
host3.run_command('user_del', user1, user2, **{'continue': True})
except errors.NotFound:
pass
try:
host3.run_command('group_del', group1, group2,
**{'continue': True})
except errors.NotFound:
pass
try:
host3.run_command('hostgroup_del', hostgroup1)
except errors.NotFound:
pass
cleanup()
request.addfinalizer(cleanup)
host3.ensure_exists()
host3.run_command('user_add', givenname=u'Test', sn=u'User1')
host3.run_command('user_add', givenname=u'Test', sn=u'User2')
host3.run_command('group_add', group1)
host3.run_command('group_add', group2)
host3.run_command('hostgroup_add', hostgroup1,
description=u'Test hostgroup 1')
@pytest.mark.tier1
class TestHostAllowedTo(XMLRPC_test):
def test_user_allow_retrieve_keytab(self, allowedto_context, host):
host.ensure_exists()
result = host.run_command('host_allow_retrieve_keytab', host.fqdn,
user=user1)
host.attrs['ipaallowedtoperform_read_keys_user'] = [user1]
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_read_keys=dict(
group=[], host=[], hostgroup=[], user=[]),
),
completed=1,
result=host.filter_attrs(host.allowedto_keys),
), result)
# Duplicates should not be accepted
result = host.run_command('host_allow_retrieve_keytab', host.fqdn,
user=user1)
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_read_keys=dict(
group=[], host=[], hostgroup=[],
user=[[user1, u'This entry is already a member']],
),
),
completed=0,
result=host.filter_attrs(host.allowedto_keys),
), result)
def test_group_allow_retrieve_keytab(self, allowedto_context, host, host3):
host.ensure_exists()
host3.ensure_exists()
result = host.run_command('host_allow_retrieve_keytab', host.fqdn,
group=[group1, group2], host=[host3.fqdn],
hostgroup=[hostgroup1])
host.attrs['ipaallowedtoperform_read_keys_group'] = [group1, group2]
host.attrs['ipaallowedtoperform_read_keys_host'] = [host3.fqdn]
host.attrs['ipaallowedtoperform_read_keys_hostgroup'] = [hostgroup1]
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_read_keys=dict(
group=[], host=[], hostgroup=[], user=[]),
),
completed=4,
result=host.filter_attrs(host.allowedto_keys),
), result)
# Non-members cannot be removed
result = host.run_command('host_disallow_retrieve_keytab', host.fqdn,
user=[user2])
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_read_keys=dict(
group=[],
host=[],
hostgroup=[],
user=[[user2, u'This entry is not a member']],
),
),
completed=0,
result=host.filter_attrs(host.allowedto_keys),
), result)
# Disallow one of the existing allowed groups
result = host.run_command('host_disallow_retrieve_keytab', host.fqdn,
group=[group2])
host.attrs['ipaallowedtoperform_read_keys_group'] = [group1]
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_read_keys=dict(
group=[], host=[], hostgroup=[], user=[]),
),
completed=1,
result=host.filter_attrs(host.allowedto_keys),
), result)
host.retrieve()
def test_allow_create(self, allowedto_context, host, host3):
host.ensure_exists()
host3.ensure_exists()
result = host.run_command('host_allow_create_keytab', host.fqdn,
group=[group1, group2], user=[user1],
host=[host3.fqdn],
hostgroup=[hostgroup1])
host.attrs['ipaallowedtoperform_write_keys_user'] = [user1]
host.attrs['ipaallowedtoperform_write_keys_group'] = [group1, group2]
host.attrs['ipaallowedtoperform_write_keys_host'] = [host3.fqdn]
host.attrs['ipaallowedtoperform_write_keys_hostgroup'] = [hostgroup1]
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_write_keys=dict(
group=[], host=[], hostgroup=[], user=[]),
),
completed=5,
result=host.filter_attrs(host.allowedto_keys),
), result)
# Duplicates should not be accepted
result = host.run_command('host_allow_create_keytab', host.fqdn,
group=[group1], user=[user1],
host=[host3.fqdn], hostgroup=[hostgroup1])
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_write_keys=dict(
group=[[group1, u'This entry is already a member']],
host=[[host3.fqdn, u'This entry is already a member']],
user=[[user1, u'This entry is already a member']],
hostgroup=[[hostgroup1,
u'This entry is already a member']],
),
),
completed=0,
result=host.filter_attrs(host.allowedto_keys),
), result)
# Non-mambers cannot be removed
result = host.run_command('host_disallow_create_keytab', host.fqdn,
user=[user2])
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_write_keys=dict(
group=[],
host=[],
hostgroup=[],
user=[[user2, u'This entry is not a member']],
),
),
completed=0,
result=host.filter_attrs(host.allowedto_keys),
), result)
# Disallow one of the existing allowed groups
result = host.run_command('host_disallow_create_keytab', host.fqdn,
group=[group2])
host.attrs['ipaallowedtoperform_write_keys_group'] = [group1]
assert_deepequal(dict(
failed=dict(
ipaallowedtoperform_write_keys=dict(
group=[],
host=[],
hostgroup=[],
user=[],
),
),
completed=1,
result=host.filter_attrs(host.allowedto_keys),
), result)
host.retrieve()
def test_host_mod(self, host):
# Done (usually) at the end to ensure the tracking works well
host.update(updates=dict(description=u'desc'),
expected_updates=dict(description=[u'desc']))