freeipa/ipatests/test_integration/test_dnssec.py

779 lines
27 KiB
Python
Raw Normal View History

#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
from __future__ import absolute_import
import base64
import logging
import re
import subprocess
import time
import textwrap
import dns.dnssec
import dns.name
import pytest
import yaml
from ipatests.test_integration.base import IntegrationTest
from ipatests.pytest_ipa.integration import tasks
from ipatests.pytest_ipa.integration.firewall import Firewall
from ipaplatform.tasks import tasks as platform_tasks
from ipaplatform.paths import paths
from ipapython.dnsutil import DNSResolver
logger = logging.getLogger(__name__)
# Sleep 5 seconds at most when waiting for LDAP updates
# for DNSSEC changes. Test zones should be updated with 1 second TTL
DNSSEC_SLEEP = 5
test_zone = "dnssec.test."
test_zone_repl = "dnssec-replica.test."
root_zone = "."
example_test_zone = "example.test."
example2_test_zone = "example2.test."
example3_test_zone = "example3.test."
def resolve_with_dnssec(nameserver, query, rtype="SOA"):
res = DNSResolver()
res.nameservers = [nameserver]
res.lifetime = 10 # wait max 10 seconds for reply
# enable Authenticated Data + Checking Disabled flags
res.set_flags(dns.flags.AD | dns.flags.CD)
# enable EDNS v0 + enable DNSSEC-Ok flag
res.use_edns(0, dns.flags.DO, 0)
ans = res.resolve(query, rtype)
return ans
def get_RRSIG_record(nameserver, query, rtype="SOA"):
ans = resolve_with_dnssec(nameserver, query, rtype=rtype)
return ans.response.find_rrset(
ans.response.answer, dns.name.from_text(query),
dns.rdataclass.IN, dns.rdatatype.RRSIG,
dns.rdatatype.from_text(rtype))
def is_record_signed(nameserver, query, rtype="SOA"):
try:
get_RRSIG_record(nameserver, query, rtype=rtype)
except KeyError:
return False
except dns.exception.DNSException:
return False
return True
def wait_until_record_is_signed(nameserver, record, rtype="SOA",
timeout=100):
"""
Returns True if record is signed, or False on timeout
:param nameserver: nameserver to query
:param record: query
:param rtype: record type
:param timeout:
:return: True if records is signed, False if timeout
"""
logger.info("Waiting for signed %s record of %s from server %s (timeout "
"%s sec)", rtype, record, nameserver, timeout)
wait_until = time.time() + timeout
while time.time() < wait_until:
if is_record_signed(nameserver, record, rtype=rtype):
return True
time.sleep(1)
return False
def dnskey_rec_with_ksk_and_zsk(nameserver, query):
"""
Returns true if the DNSKEY record contains 2 types of keys, KSK and ZSK
:param nameserver: nameserver to query
:param record: query
:return: True if the DNSKEY records contains a ZSK and a KSK
"""
ksk = False
zsk = False
ans = resolve_with_dnssec(nameserver, query, rtype="DNSKEY")
dnskey_rrset = ans.response.get_rrset(
ans.response.answer,
dns.name.from_text(query),
dns.rdataclass.IN,
dns.rdatatype.DNSKEY)
assert dnskey_rrset, "No DNSKEY records received"
for key_rdata in dnskey_rrset:
if key_rdata.flags == 257:
ksk = True
elif key_rdata.flags == 256:
zsk = True
return (ksk and zsk)
def dnszone_add_dnssec(host, test_zone):
"""Add dnszone with dnssec and short TTL
"""
args = [
"ipa",
"dnszone-add", test_zone,
"--skip-overlap-check",
"--dnssec", "true",
"--ttl", "1",
"--default-ttl", "1",
]
return host.run_command(args)
def dnssec_install_master(host):
args = [
"ipa-dns-install",
"--dnssec-master",
"--forwarder", host.config.dns_forwarder,
"-U",
]
return host.run_command(args)
class TestInstallDNSSECLast(IntegrationTest):
"""Simple DNSSEC test
Install a server and a replica with DNS, then reinstall server
as DNSSEC master
"""
num_replicas = 1
topology = 'star'
@classmethod
def install(cls, mh):
tasks.install_master(cls.master, setup_dns=True)
tasks.install_replica(cls.master, cls.replicas[0], setup_dns=True)
def test_install_dnssec_master(self):
"""Both master and replica have DNS installed"""
dnssec_install_master(self.master)
def test_if_zone_is_signed_master(self):
# add zone with enabled DNSSEC signing on master
dnszone_add_dnssec(self.master, test_zone)
# test master
assert wait_until_record_is_signed(
self.master.ip, test_zone, timeout=100
), "Zone %s is not signed (master)" % test_zone
# test replica
assert wait_until_record_is_signed(
self.replicas[0].ip, test_zone, timeout=200
), "DNS zone %s is not signed (replica)" % test_zone
def test_if_zone_is_signed_replica(self):
# add zone with enabled DNSSEC signing on replica
dnszone_add_dnssec(self.replicas[0], test_zone_repl)
# test replica
assert wait_until_record_is_signed(
self.replicas[0].ip, test_zone_repl, timeout=300
), "Zone %s is not signed (replica)" % test_zone_repl
# we do not need to wait, on master zones should be singed faster
# than on replicas
assert wait_until_record_is_signed(
self.master.ip, test_zone_repl, timeout=5
), "DNS zone %s is not signed (master)" % test_zone
def test_key_types(self):
assert dnskey_rec_with_ksk_and_zsk(self.master.ip, test_zone)
assert dnskey_rec_with_ksk_and_zsk(self.replicas[0].ip, test_zone)
assert dnskey_rec_with_ksk_and_zsk(self.master.ip, test_zone_repl)
assert dnskey_rec_with_ksk_and_zsk(self.replicas[0].ip, test_zone_repl)
def test_disable_reenable_signing_master(self):
dnskey_old = resolve_with_dnssec(self.master.ip, test_zone,
rtype="DNSKEY").rrset
# disable DNSSEC signing of zone on master
args = [
"ipa",
"dnszone-mod", test_zone,
"--dnssec", "false",
]
self.master.run_command(args)
time.sleep(DNSSEC_SLEEP)
# test master
assert not is_record_signed(
self.master.ip, test_zone
), "Zone %s is still signed (master)" % test_zone
# test replica
assert not is_record_signed(
self.replicas[0].ip, test_zone
), "DNS zone %s is still signed (replica)" % test_zone
# reenable DNSSEC signing
args = [
"ipa",
"dnszone-mod", test_zone,
"--dnssec", "true",
]
self.master.run_command(args)
# TODO: test require restart
tasks.restart_named(self.master, self.replicas[0])
# test master
assert wait_until_record_is_signed(
self.master.ip, test_zone, timeout=100
), "Zone %s is not signed (master)" % test_zone
# test replica
assert wait_until_record_is_signed(
self.replicas[0].ip, test_zone, timeout=200
), "DNS zone %s is not signed (replica)" % test_zone
dnskey_new = resolve_with_dnssec(self.master.ip, test_zone,
rtype="DNSKEY").rrset
assert dnskey_old != dnskey_new, "DNSKEY should be different"
def test_disable_reenable_signing_replica(self):
dnskey_old = resolve_with_dnssec(self.replicas[0].ip, test_zone_repl,
rtype="DNSKEY").rrset
# disable DNSSEC signing of zone on replica
args = [
"ipa",
"dnszone-mod", test_zone_repl,
"--dnssec", "false",
]
self.master.run_command(args)
time.sleep(DNSSEC_SLEEP)
# test master
assert not is_record_signed(
self.master.ip, test_zone_repl
), "Zone %s is still signed (master)" % test_zone_repl
# test replica
assert not is_record_signed(
self.replicas[0].ip, test_zone_repl
), "DNS zone %s is still signed (replica)" % test_zone_repl
# reenable DNSSEC signing
args = [
"ipa",
"dnszone-mod", test_zone_repl,
"--dnssec", "true",
]
self.master.run_command(args)
# TODO: test require restart
tasks.restart_named(self.master, self.replicas[0])
# test master
assert wait_until_record_is_signed(
self.master.ip, test_zone_repl, timeout=100
), "Zone %s is not signed (master)" % test_zone_repl
# test replica
assert wait_until_record_is_signed(
self.replicas[0].ip, test_zone_repl, timeout=200
), "DNS zone %s is not signed (replica)" % test_zone_repl
dnskey_new = resolve_with_dnssec(self.replicas[0].ip, test_zone_repl,
rtype="DNSKEY").rrset
assert dnskey_old != dnskey_new, "DNSKEY should be different"
class TestInstallDNSSECFirst(IntegrationTest):
"""Simple DNSSEC test
Install the server with DNSSEC and then install the replica with DNS
"""
num_replicas = 1
topology = 'star'
@classmethod
def install(cls, mh):
tasks.install_master(cls.master, setup_dns=False)
args = [
"ipa-dns-install",
"--dnssec-master",
"--forwarder", cls.master.config.dns_forwarder,
"-U",
]
cls.master.run_command(args)
# Enable dns service on master as it has been installed without dns
# support before
Firewall(cls.master).enable_services(["dns"])
tasks.install_replica(cls.master, cls.replicas[0], setup_dns=True,
nameservers=None)
# backup trusted key
tasks.backup_file(cls.master, paths.DNSSEC_TRUSTED_KEY)
tasks.backup_file(cls.replicas[0], paths.DNSSEC_TRUSTED_KEY)
@classmethod
def uninstall(cls, mh):
# restore trusted key
tasks.restore_files(cls.master)
tasks.restore_files(cls.replicas[0])
super(TestInstallDNSSECFirst, cls).uninstall(mh)
def test_sign_root_zone(self):
dnszone_add_dnssec(self.master, root_zone)
# make BIND happy: add the glue record and delegate zone
args = [
"ipa", "dnsrecord-add", root_zone, self.master.hostname,
"--a-rec=" + self.master.ip
]
self.master.run_command(args)
args = [
"ipa", "dnsrecord-add", root_zone, self.replicas[0].hostname,
"--a-rec=" + self.replicas[0].ip
]
self.master.run_command(args)
time.sleep(DNSSEC_SLEEP)
args = [
"ipa", "dnsrecord-add", root_zone, self.master.domain.name,
"--ns-rec=" + self.master.hostname
]
self.master.run_command(args)
# test master
assert wait_until_record_is_signed(
self.master.ip, root_zone, timeout=100
), "Zone %s is not signed (master)" % root_zone
# test replica
assert wait_until_record_is_signed(
self.replicas[0].ip, root_zone, timeout=300
), "Zone %s is not signed (replica)" % root_zone
def test_delegation(self):
dnszone_add_dnssec(self.master, example_test_zone)
# delegation
args = [
"ipa", "dnsrecord-add", root_zone, example_test_zone,
"--ns-rec=" + self.master.hostname
]
self.master.run_command(args)
# TODO: test require restart
tasks.restart_named(self.master, self.replicas[0])
# wait until zone is signed
assert wait_until_record_is_signed(
self.master.ip, example_test_zone, timeout=100
), "Zone %s is not signed (master)" % example_test_zone
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[0].ip, example_test_zone, timeout=200
), "Zone %s is not signed (replica)" % example_test_zone
# GET DNSKEY records from zone
ans = resolve_with_dnssec(self.master.ip, example_test_zone,
rtype="DNSKEY")
dnskey_rrset = ans.response.get_rrset(
ans.response.answer,
dns.name.from_text(example_test_zone),
dns.rdataclass.IN,
dns.rdatatype.DNSKEY)
assert dnskey_rrset, "No DNSKEY records received"
logger.debug("DNSKEY records returned: %s", dnskey_rrset.to_text())
# generate DS records
ds_records = []
for key_rdata in dnskey_rrset:
if key_rdata.flags != 257:
continue # it is not KSK
ds_records.append(dns.dnssec.make_ds(example_test_zone, key_rdata,
'sha256'))
assert ds_records, ("No KSK returned from the %s zone" %
example_test_zone)
logger.debug("DS records for %s created: %r", example_test_zone,
ds_records)
# add DS records to root zone
args = [
"ipa", "dnsrecord-add", root_zone, example_test_zone,
# DS record requires to coexists with NS
"--ns-rec", self.master.hostname,
]
for ds in ds_records:
args.append("--ds-rec")
args.append(ds.to_text())
self.master.run_command(args)
# wait until DS records it replicated
assert wait_until_record_is_signed(
self.replicas[0].ip, example_test_zone, timeout=100,
rtype="DS"
), "No DS record of '%s' returned from replica" % example_test_zone
def test_chain_of_trust_drill(self):
"""
Validate signed DNS records, using our own signed root zone
"""
# extract DSKEY from root zone
ans = resolve_with_dnssec(self.master.ip, root_zone,
rtype="DNSKEY")
dnskey_rrset = ans.response.get_rrset(ans.response.answer,
dns.name.from_text(root_zone),
dns.rdataclass.IN,
dns.rdatatype.DNSKEY)
assert dnskey_rrset, "No DNSKEY records received"
logger.debug("DNSKEY records returned: %s", dnskey_rrset.to_text())
# export trust keys for root zone
root_key_rdatas = []
for key_rdata in dnskey_rrset:
if key_rdata.flags != 257:
continue # it is not KSK
root_key_rdatas.append(key_rdata)
assert root_key_rdatas, "No KSK returned from the root zone"
root_keys_rrset = dns.rrset.from_rdata_list(dnskey_rrset.name,
dnskey_rrset.ttl,
root_key_rdatas)
logger.debug("Root zone trusted key: %s", root_keys_rrset.to_text())
# set trusted key for our root zone
self.master.put_file_contents(paths.DNSSEC_TRUSTED_KEY,
root_keys_rrset.to_text() + '\n')
self.replicas[0].put_file_contents(paths.DNSSEC_TRUSTED_KEY,
root_keys_rrset.to_text() + '\n')
# verify signatures
time.sleep(DNSSEC_SLEEP)
args = [
"drill", "@localhost", "-k",
paths.DNSSEC_TRUSTED_KEY, "-S",
example_test_zone, "SOA"
]
# test if signature chains are valid
self.master.run_command(args)
self.replicas[0].run_command(args)
def test_chain_of_trust_delv(self):
"""
Validate signed DNS records, using our own signed root zone
"""
INITIAL_KEY_FMT = '%s initial-key %d %d %d "%s";'
# delv reports its version on stderr
delv_version = self.master.run_command(
["delv", "-v"]
).stderr_text.rstrip().replace("delv ", "")
assert delv_version
delv_version_parsed = platform_tasks.parse_ipa_version(delv_version)
if delv_version_parsed < platform_tasks.parse_ipa_version("9.16"):
pytest.skip(
f"Requires delv >= 9.16(+yaml), installed: '{delv_version}'"
)
# extract DSKEY from root zone
ans = resolve_with_dnssec(
self.master.ip, root_zone, rtype="DNSKEY"
)
dnskey_rrset = ans.response.get_rrset(
ans.response.answer,
dns.name.from_text(root_zone),
dns.rdataclass.IN,
dns.rdatatype.DNSKEY,
)
assert dnskey_rrset, "No DNSKEY records received"
# export trust keys for root zone
initial_keys = []
for key_rdata in dnskey_rrset:
if key_rdata.flags != 257:
continue # it is not KSK
initial_keys.append(
INITIAL_KEY_FMT % (
root_zone,
key_rdata.flags,
key_rdata.protocol,
key_rdata.algorithm,
base64.b64encode(key_rdata.key).decode("utf-8"),
)
)
assert initial_keys, "No KSK returned from the root zone"
trust_anchors = textwrap.dedent(
"""\
trust-anchors {{
{initial_key}
}};
"""
).format(initial_key="\n".join(initial_keys))
logger.debug("Root zone trust-anchors: %s", trust_anchors)
# set trusted anchor for our root zone
for host in [self.master, self.replicas[0]]:
host.put_file_contents(paths.DNSSEC_TRUSTED_KEY, trust_anchors)
# verify signatures
args = [
"delv",
"+yaml",
"+nosplit",
"+vtrace",
"@127.0.0.1",
example_test_zone,
"-a",
paths.DNSSEC_TRUSTED_KEY,
"SOA",
]
# delv puts trace info on stderr
for host in [self.master, self.replicas[0]]:
result = host.run_command(args)
yaml_data = yaml.safe_load(result.stdout_text)
query_name_abs = dns.name.from_text(example_test_zone)
root_zone_name = dns.name.from_text(root_zone)
query_name_rel = query_name_abs.relativize(
root_zone_name
).to_text()
assert yaml_data["query_name"] == query_name_rel
assert yaml_data["status"] == "success"
assert len(yaml_data["records"]) == 1
fully_validated = yaml_data["records"][0]["fully_validated"]
fully_validated.sort()
assert len(fully_validated) == 2
assert f"{example_test_zone} 1 IN RRSIG SOA" in fully_validated[0]
assert f"{example_test_zone} 1 IN SOA" in fully_validated[1]
def test_servers_use_localhost_as_dns(self):
# check that localhost is set as DNS server
for host in [self.master, self.replicas[0]]:
assert host.resolver.uses_localhost_as_dns()
class TestMigrateDNSSECMaster(IntegrationTest):
"""test DNSSEC master migration
Install a server and a replica with DNS, then reinstall server
as DNSSEC master
Test:
* migrate dnssec master to replica
* create new zone
* verify if zone is signed on all replicas
* add new replica
* add new zone
* test if new zone is signed on all replicas
"""
num_replicas = 2
topology = 'star'
@classmethod
def install(cls, mh):
tasks.install_master(cls.master, setup_dns=True)
args = [
"ipa-dns-install",
"--dnssec-master",
"--forwarder", cls.master.config.dns_forwarder,
"-U",
]
cls.master.run_command(args)
# No need to enable dns service in the firewall as master has been
# installed with dns support enabled
# Firewall(cls.master).enable_services(["dns"])
tasks.install_replica(cls.master, cls.replicas[0], setup_dns=True)
@classmethod
def uninstall(cls, mh):
# For this test, we need to uninstall DNSSEC master last
# Find which server is DNSSec master
result = cls.master.run_command(["ipa", "config-show"]).stdout_text
matches = list(re.finditer('IPA DNSSec key master: (.*)', result))
if len(matches) == 1:
# Found the DNSSec master
dnssec_master_hostname = matches[0].group(1)
for replica in cls.replicas + [cls.master]:
if replica.hostname == dnssec_master_hostname:
dnssec_master = replica
else:
# By default consider that the master is DNSSEC
dnssec_master = cls.master
for replica in cls.replicas + [cls.master]:
if replica == dnssec_master:
# Skip this one
continue
try:
tasks.run_server_del(
dnssec_master, replica.hostname, force=True,
ignore_topology_disconnect=True, ignore_last_of_role=True)
except subprocess.CalledProcessError:
# If the master has already been uninstalled,
# this call may fail
pass
tasks.uninstall_master(replica)
tasks.uninstall_master(dnssec_master)
def test_migrate_dnssec_master(self):
"""Both master and replica have DNS installed"""
backup_filename = "/var/lib/ipa/ipa-kasp.db.backup"
replica_backup_filename = "/tmp/ipa-kasp.db.backup"
# add test zone
dnszone_add_dnssec(self.master, example_test_zone)
# wait until zone is signed
assert wait_until_record_is_signed(
self.master.ip, example_test_zone, timeout=100
), "Zone %s is not signed (master)" % example_test_zone
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[0].ip, example_test_zone, timeout=200
), "Zone %s is not signed (replica)" % example_test_zone
dnskey_old = resolve_with_dnssec(self.master.ip, example_test_zone,
rtype="DNSKEY").rrset
# migrate dnssec master to replica
args = [
"ipa-dns-install",
"--disable-dnssec-master",
"--forwarder", self.master.config.dns_forwarder,
"--force",
"-U",
]
self.master.run_command(args)
# move content of "ipa-kasp.db.backup" to replica
kasp_db_backup = self.master.get_file_contents(backup_filename)
self.replicas[0].put_file_contents(replica_backup_filename,
kasp_db_backup)
args = [
"ipa-dns-install",
"--dnssec-master",
"--kasp-db", replica_backup_filename,
"--forwarder", self.master.config.dns_forwarder,
"-U",
]
self.replicas[0].run_command(args)
# Enable the dns service in the firewall on the replica
Firewall(self.replicas[0]).enable_services(["dns"])
# wait until zone is signed
assert wait_until_record_is_signed(
self.master.ip, example_test_zone, timeout=100
), "Zone %s is not signed after migration (master)" % example_test_zone
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[0].ip, example_test_zone, timeout=200
), "Zone %s is not signed after migration (replica)" % example_test_zone
# test if dnskey are the same
dnskey_new = resolve_with_dnssec(self.master.ip, example_test_zone,
rtype="DNSKEY").rrset
assert dnskey_old == dnskey_new, "DNSKEY should be the same"
# add test zone
dnszone_add_dnssec(self.replicas[0], example2_test_zone)
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[0].ip, example2_test_zone, timeout=100
), ("Zone %s is not signed after migration (replica - dnssec master)"
% example2_test_zone)
# wait until zone is signed
assert wait_until_record_is_signed(
self.master.ip, example2_test_zone, timeout=200
), ("Zone %s is not signed after migration (master)"
% example2_test_zone)
# add new replica
tasks.install_replica(self.master, self.replicas[1], setup_dns=True)
# test if originial zones are signed on new replica
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[1].ip, example_test_zone, timeout=200
), ("Zone %s is not signed (new replica)"
% example_test_zone)
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[1].ip, example2_test_zone, timeout=200
), ("Zone %s is not signed (new replica)"
% example2_test_zone)
# add new zone to new replica
dnszone_add_dnssec(self.replicas[0], example3_test_zone)
# wait until zone is signed
assert wait_until_record_is_signed(
self.replicas[1].ip, example3_test_zone, timeout=200
), ("Zone %s is not signed (new replica)"
% example3_test_zone)
assert wait_until_record_is_signed(
self.replicas[0].ip, example3_test_zone, timeout=200
), ("Zone %s is not signed (replica)"
% example3_test_zone)
# wait until zone is signed
assert wait_until_record_is_signed(
self.master.ip, example3_test_zone, timeout=200
), ("Zone %s is not signed (master)"
% example3_test_zone)
class TestInstallNoDnssecValidation(IntegrationTest):
"""test installation of the master with
--no-dnssec-validation
Test for issue 7666: ipa-server-install --setup-dns is failing
if using --no-dnssec-validation and --forwarder, when the
specified forwarder does not support DNSSEC.
The forwarder should not be checked for DNSSEC support when
--no-dnssec-validation argument is specified.
In order to reproduce the conditions, the test is using a dummy
IP address for the forwarder (i.e. there is no BIND service available
at this IP address). To make sure of that, the test is using the IP of
a replica (that is not yet setup).
"""
num_replicas = 1
@classmethod
def install(cls, mh):
cls.install_args = [
'ipa-server-install',
'-n', cls.master.domain.name,
'-r', cls.master.domain.realm,
'-p', cls.master.config.dirman_password,
'-a', cls.master.config.admin_password,
'-U',
'--setup-dns',
'--forwarder', cls.replicas[0].ip,
'--auto-reverse'
]
def test_install_withDnssecValidation(self):
cmd = self.master.run_command(self.install_args, raiseonerr=False)
# The installer checks that the forwarder supports DNSSEC
# but the forwarder does not answer => expect failure
assert cmd.returncode != 0
def test_install_noDnssecValidation(self):
# With the --no-dnssec-validation, the installer does not check
# whether the forwarder supports DNSSEC => success even if the
# forwarder is not reachable
self.master.run_command(
self.install_args + ['--no-dnssec-validation'])