freeipa/ipatests/test_integration/test_dns_locations.py
Stanislav Levin 49e643783d dnspython: Add compatibility shim
`dnspython` 2.0.0 has many changes and several deprecations like:

```
> dns.resolver.resolve() has been added, allowing control of whether
search lists are used. dns.resolver.query() is retained for backwards
compatibility, but deprecated. The default for search list behavior can
be set at in the resolver object with the use_search_by_default
parameter. The default is False.

> dns.resolver.resolve_address() has been added, allowing easy
address-to-name lookups.
```

The new class `DNSResolver`:
- provides the compatibility layer
- defaults the previous behavior (the search list configured in the
  system's resolver configuration is used for relative names)
- defaults lifetime to 15sec (determines the number of seconds
  to spend trying to get an answer to the question)

Fixes: https://pagure.io/freeipa/issue/8383
Signed-off-by: Stanislav Levin <slev@altlinux.org>
Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com>
2020-08-31 09:46:03 +03:00

495 lines
19 KiB
Python

#
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
import logging
import re
import time
import pytest
import dns.resolver
import dns.rrset
import dns.rdatatype
import dns.rdataclass
from ipatests.test_integration.base import IntegrationTest
from ipatests.pytest_ipa.integration import tasks
from ipapython.dnsutil import DNSName, DNSResolver
from ipalib.constants import IPA_CA_RECORD
logger = logging.getLogger(__name__)
IPA_DEFAULT_MASTER_SRV_REC = (
# srv record name, port
(DNSName(u'_ldap._tcp'), 389),
(DNSName(u'_kerberos._tcp'), 88),
(DNSName(u'_kerberos._udp'), 88),
(DNSName(u'_kerberos-master._tcp'), 88),
(DNSName(u'_kerberos-master._udp'), 88),
(DNSName(u'_kpasswd._tcp'), 464),
(DNSName(u'_kpasswd._udp'), 464),
)
IPA_DEFAULT_ADTRUST_SRV_REC = (
# srv record name, port
(DNSName(u'_ldap._tcp.Default-First-Site-Name._sites.dc._msdcs'), 389),
(DNSName(u'_ldap._tcp.dc._msdcs'), 389),
(DNSName(u'_kerberos._tcp.Default-First-Site-Name._sites.dc._msdcs'), 88),
(DNSName(u'_kerberos._udp.Default-First-Site-Name._sites.dc._msdcs'), 88),
(DNSName(u'_kerberos._tcp.dc._msdcs'), 88),
(DNSName(u'_kerberos._udp.dc._msdcs'), 88),
)
IPA_CA_A_REC = (
(DNSName(str(IPA_CA_RECORD))),
)
def resolve_records_from_server(rname, rtype, nameserver):
error = None
res = DNSResolver()
res.nameservers = [nameserver]
res.lifetime = 30
logger.info("Query: %s %s, nameserver %s", rname, rtype, nameserver)
# lets try to query 3x
for _i in range(3):
try:
ans = res.resolve(rname, rtype)
logger.info("Answer: %s", ans.rrset)
return ans.rrset
except (dns.resolver.NXDOMAIN, dns.resolver.Timeout) as e:
error = e
time.sleep(10)
pytest.fail("Query: {} {}, nameserver {} failed due to {}".format(
rname, rtype, nameserver, error))
def _gen_expected_srv_rrset(rname, port, servers, ttl=86400):
rdata_list = [
"{prio} {weight} {port} {servername}".format(
prio=prio,
weight=weight,
port=port,
servername=servername.make_absolute()
)
for prio, weight, servername in servers
]
return dns.rrset.from_text_list(
rname, ttl, dns.rdataclass.IN, dns.rdatatype.SRV, rdata_list
)
def _gen_expected_a_rrset(rname, servers, ttl=86400):
return dns.rrset.from_text_list(rname, ttl, dns.rdataclass.IN,
dns.rdatatype.A, servers)
def _get_relative_weights(text):
"""Takes location-show output and returns a list of percentages"""
return re.findall(r"\d+.\d%", text)
class TestDNSLocations(IntegrationTest):
"""Simple test if SRV DNS records for IPA locations are generated properly
Topology:
* 3 servers (replica0 --- master --- replica1)
replica0 with no CA, master with ADtrust installed later,
replica1 with CA
* 2 locations (prague, paris)
"""
num_replicas = 2
topology = 'star'
LOC_PRAGUE = u'prague'
LOC_PARIS = u'paris'
PRIO_HIGH = 0
PRIO_LOW = 50
WEIGHT = 100
@classmethod
def install(cls, mh):
cls.domain = DNSName(cls.master.domain.name).make_absolute()
tasks.install_master(cls.master, setup_dns=True)
tasks.install_replica(cls.master, cls.replicas[0], setup_dns=True,
setup_ca=False)
tasks.install_replica(cls.master, cls.replicas[1], setup_dns=True,
setup_ca=True)
for host in (cls.master, cls.replicas[0], cls.replicas[1]):
ldap = host.ldap_connect()
tasks.wait_for_replication(ldap)
# give time to named to retrieve new records
time.sleep(20)
@classmethod
def delete_update_system_records(cls, rnames):
filepath = '/tmp/ipa.nsupdate'
cls.master.run_command([
'ipa', 'dns-update-system-records', '--dry-run', '--out', filepath
])
for name in rnames:
cls.master.run_command([
'ipa', 'dnsrecord-del', str(cls.domain), str(name),
'--del-all'])
time.sleep(15)
# allow unauthenticates nsupdate (no need to testing authentication)
cls.master.run_command([
'ipa', 'dnszone-mod', str(cls.domain),
'--update-policy=grant * wildcard *;'
], raiseonerr=False)
cls.master.run_command(['nsupdate', '-g', filepath])
time.sleep(15)
def _test_A_rec_against_server(self, server_ip, domain, expected_servers,
rec_list=IPA_CA_A_REC):
for rname in rec_list:
name_abs = rname.derelativize(domain)
expected = _gen_expected_a_rrset(name_abs, expected_servers)
query = resolve_records_from_server(
name_abs, 'A', server_ip)
assert expected == query, (
"Expected and received DNS data do not match on server "
"with IP: '{}' for name '{}' (expected:\n{}\ngot:\n{})".
format(server_ip, name_abs, expected, query))
def _test_SRV_rec_against_server(self, server_ip, domain, expected_servers,
rec_list=IPA_DEFAULT_MASTER_SRV_REC):
for rname, port in rec_list:
name_abs = rname.derelativize(domain)
expected = _gen_expected_srv_rrset(
name_abs, port, expected_servers)
query = resolve_records_from_server(
name_abs, 'SRV', server_ip)
assert expected == query, (
"Expected and received DNS data do not match on server "
"with IP: '{}' for name '{}' (expected:\n{}\ngot:\n{})".
format(server_ip, name_abs, expected, query))
def test_without_locations(self):
"""Servers are not in locations, this tests if basic system records
are generated properly"""
expected_servers = (
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
for ip in (self.master.ip, self.replicas[0].ip, self.replicas[1].ip):
self._test_SRV_rec_against_server(ip, self.domain,
expected_servers)
def test_nsupdate_without_locations(self):
"""Test nsupdate file generated by dns-update-system-records
Remove all records and the use nsupdate to restore state and test if
all record are there as expected"""
self.delete_update_system_records(rnames=(r[0] for r in
IPA_DEFAULT_MASTER_SRV_REC))
self.test_without_locations()
def test_one_replica_in_location(self):
"""Put one replica to location and test if records changed properly
"""
# create location prague, replica0 --> location prague
self.master.run_command([
'ipa', 'location-add', self.LOC_PRAGUE
])
self.master.run_command([
'ipa', 'server-mod', self.replicas[0].hostname,
'--location', self.LOC_PRAGUE
])
tasks.restart_named(self.replicas[0])
servers_without_loc = (
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_without_loc = DNSName(self.master.domain.name).make_absolute()
servers_prague_loc = (
(self.PRIO_LOW, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_prague_loc = (
DNSName('{}._locations'.format(self.LOC_PRAGUE)) +
DNSName(self.master.domain.name).make_absolute()
)
self._test_SRV_rec_against_server(
self.replicas[0].ip, domain_prague_loc, servers_prague_loc)
for ip in (self.master.ip, self.replicas[1].ip):
self._test_SRV_rec_against_server(
ip, domain_without_loc, servers_without_loc)
def test_two_replicas_in_location(self):
"""Put second replica to location and test if records changed properly
"""
# create location paris, replica1 --> location prague
self.master.run_command(['ipa', 'location-add', self.LOC_PARIS])
self.master.run_command([
'ipa', 'server-mod', self.replicas[1].hostname, '--location',
self.LOC_PARIS])
tasks.restart_named(self.replicas[1])
servers_without_loc = (
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_without_loc = DNSName(self.master.domain.name).make_absolute()
servers_prague_loc = (
(self.PRIO_LOW, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_prague_loc = (
DNSName('{}._locations'.format(self.LOC_PRAGUE)) + DNSName(
self.master.domain.name).make_absolute())
servers_paris_loc = (
(self.PRIO_LOW, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_paris_loc = (
DNSName('{}._locations'.format(self.LOC_PARIS)) + DNSName(
self.master.domain.name).make_absolute())
self._test_SRV_rec_against_server(
self.replicas[0].ip, domain_prague_loc, servers_prague_loc)
self._test_SRV_rec_against_server(
self.replicas[1].ip, domain_paris_loc, servers_paris_loc)
self._test_SRV_rec_against_server(
self.master.ip, domain_without_loc, servers_without_loc)
def test_all_servers_in_location(self):
"""Put master (as second server) to location and test if records
changed properly
"""
# master --> location paris
self.master.run_command([
'ipa', 'server-mod', self.master.hostname, '--location',
self.LOC_PARIS])
tasks.restart_named(self.master)
servers_prague_loc = (
(self.PRIO_LOW, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_prague_loc = (
DNSName('{}._locations'.format(self.LOC_PRAGUE)) + DNSName(
self.master.domain.name).make_absolute())
servers_paris_loc = (
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.master.hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_paris_loc = (
DNSName('{}._locations'.format(self.LOC_PARIS)) + DNSName(
self.master.domain.name).make_absolute())
self._test_SRV_rec_against_server(
self.replicas[0].ip, domain_prague_loc, servers_prague_loc)
for ip in (self.replicas[1].ip, self.master.ip):
self._test_SRV_rec_against_server(ip, domain_paris_loc,
servers_paris_loc)
def test_change_weight(self):
"""Change weight of master and test if records changed properly
"""
new_weight = 2000
self.master.run_command([
'ipa', 'server-mod', self.master.hostname, '--service-weight',
str(new_weight)
])
# all servers must be restarted
tasks.restart_named(self.master, self.replicas[0], self.replicas[1])
servers_prague_loc = (
(self.PRIO_LOW, new_weight, DNSName(self.master.hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_prague_loc = (
DNSName('{}._locations'.format(self.LOC_PRAGUE)) + DNSName(
self.master.domain.name).make_absolute())
servers_paris_loc = (
(self.PRIO_HIGH, new_weight, DNSName(self.master.hostname)),
(self.PRIO_LOW, self.WEIGHT, DNSName(self.replicas[0].hostname)),
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.replicas[1].hostname)),
)
domain_paris_loc = (
DNSName('{}._locations'.format(self.LOC_PARIS)) + DNSName(
self.master.domain.name).make_absolute())
self._test_SRV_rec_against_server(
self.replicas[0].ip, domain_prague_loc, servers_prague_loc)
for ip in (self.replicas[1].ip, self.master.ip):
self._test_SRV_rec_against_server(ip, domain_paris_loc,
servers_paris_loc)
def test_change_weight_relative_zero_0(self):
"""Change weight of one master and check on relative weight %
"""
new_weight = 0
# Put all servers into one location
self.master.run_command([
'ipa', 'server-mod', self.replicas[0].hostname, '--location',
self.LOC_PARIS])
# Modify one to have a weight of 0
result = self.master.run_command([
'ipa', 'server-mod', self.master.hostname, '--service-weight',
str(new_weight)
])
result = self.master.run_command([
'ipa', 'location-show', self.LOC_PARIS
])
weights = _get_relative_weights(result.stdout_text)
assert weights.count('0.1%') == 1
assert weights.count('50.0%') == 2
# The following three tests are name-sensitive so they run in
# a specific order. They use the paris location and depend on
# the existing values of the server location and weight to work
# properly
def test_change_weight_relative_zero_1(self):
"""Change all weights to zero and ensure no div by zero
"""
new_weight = 0
# Depends on order of test execution but all masters are now
# in LOC_PARIS and self.master has a weight of 0.
# Modify all replicas to have a weight of 0
for hostname in (self.replicas[0].hostname, self.replicas[1].hostname):
self.master.run_command([
'ipa', 'server-mod', hostname, '--service-weight',
str(new_weight)
])
result = self.master.run_command([
'ipa', 'location-show', self.LOC_PARIS
])
weights = _get_relative_weights(result.stdout_text)
assert weights.count('33.3%') == 3
def test_change_weight_relative_zero_2(self):
"""Change to mixed weight values and check percentages
"""
new_weight = 100
# Change master to be primary, replicas secondary
self.master.run_command([
'ipa', 'server-mod', self.master.hostname, '--service-weight',
'200'
])
for hostname in (self.replicas[0].hostname,
self.replicas[1].hostname):
self.master.run_command([
'ipa', 'server-mod', hostname, '--service-weight',
str(new_weight)
])
result = self.master.run_command([
'ipa', 'location-show', self.LOC_PARIS
])
weights = _get_relative_weights(result.stdout_text)
assert weights.count('50.0%') == 1
assert weights.count('25.0%') == 2
def test_restore_locations_and_weight(self):
"""Restore locations and weight. Not just for test purposes but also
for the following tests"""
for hostname in (self.master.hostname, self.replicas[0].hostname,
self.replicas[1].hostname):
self.master.run_command(['ipa', 'server-mod', hostname,
'--location='''])
self.master.run_command(['ipa', 'location-del', self.LOC_PRAGUE])
self.master.run_command(['ipa', 'location-del', self.LOC_PARIS])
self.master.run_command([
'ipa', 'server-mod', self.master.hostname, '--service-weight',
str(self.WEIGHT)
])
tasks.restart_named(self.master, self.replicas[0], self.replicas[1])
time.sleep(5)
def test_ipa_ca_records(self):
""" Test ipa-ca dns records with firstly removing the records and then
using the nsupdate generated by dns-update-system-records"""
self.delete_update_system_records(rnames=IPA_CA_A_REC)
expected_servers = (self.master.ip, self.replicas[1].ip)
for ip in (self.master.ip, self.replicas[0].ip, self.replicas[1].ip):
self._test_A_rec_against_server(ip, self.domain, expected_servers)
def test_adtrust_system_records(self):
""" Test ADTrust dns records with firstly installing a trust then
removing the records and using the nsupdate generated by
dns-update-system-records."""
self.master.run_command(['ipa-adtrust-install', '-U',
'--enable-compat', '--netbios-name', 'IPA',
'-a', self.master.config.admin_password,
'--add-sids'])
# lets re-kinit after adtrust-install and restart named
tasks.kinit_admin(self.master)
tasks.restart_named(self.master)
time.sleep(5)
self.delete_update_system_records(rnames=(r[0] for r in
IPA_DEFAULT_ADTRUST_SRV_REC))
expected_servers = (
(self.PRIO_HIGH, self.WEIGHT, DNSName(self.master.hostname)),
)
for ip in (self.master.ip, self.replicas[0].ip, self.replicas[1].ip):
self._test_SRV_rec_against_server(
ip, self.domain, expected_servers,
rec_list=IPA_DEFAULT_ADTRUST_SRV_REC)
def test_remove_replica_with_ca(self):
"""Test ipa-ca dns records after removing the replica with CA"""
tasks.uninstall_replica(self.master, self.replicas[1])
self.delete_update_system_records(rnames=IPA_CA_A_REC)
expected_servers = (self.master.ip,)
self._test_A_rec_against_server(self.master.ip, self.domain,
expected_servers)