mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
DNSSEC: Improve global forwarders validation
Validation now provides more detailed information and less false positives failures. https://fedorahosted.org/freeipa/ticket/4657 Reviewed-By: David Kupka <dkupka@redhat.com> Reviewed-By: Petr Spacek <pspacek@redhat.com>
This commit is contained in:
committed by
Petr Vobornik
parent
c9cbb1493a
commit
9aa6124b39
130
ipalib/util.py
130
ipalib/util.py
@@ -36,7 +36,7 @@ from dns import resolver, rdatatype
|
||||
from dns.exception import DNSException
|
||||
from netaddr.core import AddrFormatError
|
||||
|
||||
from ipalib import errors
|
||||
from ipalib import errors, messages
|
||||
from ipalib.text import _
|
||||
from ipapython.ssh import SSHPublicKey
|
||||
from ipapython.dn import DN, RDN
|
||||
@@ -559,38 +559,126 @@ def validate_hostmask(ugettext, hostmask):
|
||||
return _('invalid hostmask')
|
||||
|
||||
|
||||
def validate_dnssec_forwarder(ip_addr):
|
||||
"""Test DNS forwarder properties.
|
||||
class ForwarderValidationError(Exception):
|
||||
format = None
|
||||
|
||||
:returns:
|
||||
True if forwarder works as expected and supports DNSSEC.
|
||||
False if forwarder does not support DNSSEC.
|
||||
None if forwarder does not respond.
|
||||
def __init__(self, format=None, message=None, **kw):
|
||||
messages.process_message_arguments(self, format, message, **kw)
|
||||
super(ForwarderValidationError, self).__init__(self.msg)
|
||||
|
||||
|
||||
class UnresolvableRecordError(ForwarderValidationError):
|
||||
format = _("query '%(owner)s %(rtype)s': %(error)s")
|
||||
|
||||
|
||||
class EDNS0UnsupportedError(ForwarderValidationError):
|
||||
format = _("query '%(owner)s %(rtype)s' with EDNS0: %(error)s")
|
||||
|
||||
|
||||
class DNSSECSignatureMissingError(ForwarderValidationError):
|
||||
format = _("answer to query '%(owner)s %(rtype)s' is missing DNSSEC "
|
||||
"signatures (no RRSIG data)")
|
||||
|
||||
|
||||
def _log_response(log, e):
|
||||
"""
|
||||
ip_addr = str(ip_addr)
|
||||
If exception contains response from server, log this response to debug log
|
||||
:param log: if log is None, do not log
|
||||
:param e: DNSException
|
||||
"""
|
||||
assert isinstance(e, DNSException)
|
||||
if log is not None:
|
||||
response = getattr(e, 'kwargs', {}).get('response')
|
||||
if response:
|
||||
log.debug("DNSException: %s; server response: %s", e, response)
|
||||
|
||||
|
||||
def _resolve_record(owner, rtype, nameserver_ip=None, edns0=False,
|
||||
dnssec=False, timeout=10):
|
||||
"""
|
||||
:param nameserver_ip: if None, default resolvers will be used
|
||||
:param edns0: enables EDNS0
|
||||
:param dnssec: enabled EDNS0, flags: DO
|
||||
:raise DNSException: if error occurs
|
||||
"""
|
||||
assert isinstance(nameserver_ip, basestring)
|
||||
assert isinstance(rtype, basestring)
|
||||
|
||||
res = dns.resolver.Resolver()
|
||||
res.nameservers = [ip_addr]
|
||||
res.lifetime = 10 # wait max 10 seconds for reply
|
||||
if nameserver_ip:
|
||||
res.nameservers = [nameserver_ip]
|
||||
res.lifetime = timeout
|
||||
|
||||
# enable Authenticated Data + Checking Disabled flags
|
||||
res.set_flags(dns.flags.AD | dns.flags.CD)
|
||||
# Recursion Desired,
|
||||
# this option prevents to get answers in authority section instead of answer
|
||||
res.set_flags(dns.flags.RD)
|
||||
|
||||
# enable EDNS v0 + enable DNSSEC-Ok flag
|
||||
res.use_edns(0, dns.flags.DO, 0)
|
||||
if dnssec:
|
||||
res.use_edns(0, dns.flags.DO, 4096)
|
||||
res.set_flags(dns.flags.RD)
|
||||
elif edns0:
|
||||
res.use_edns(0, 0, 4096)
|
||||
|
||||
return res.query(owner, rtype)
|
||||
|
||||
|
||||
def _validate_edns0_forwarder(owner, rtype, ip_addr, log=None, timeout=10):
|
||||
"""
|
||||
Validate if forwarder supports EDNS0
|
||||
|
||||
:raise UnresolvableRecordError: record cannot be resolved
|
||||
:raise EDNS0UnsupportedError: EDNS0 is not supported by forwarder
|
||||
"""
|
||||
|
||||
try:
|
||||
_resolve_record(owner, rtype, nameserver_ip=ip_addr, timeout=timeout)
|
||||
except DNSException as e:
|
||||
_log_response(log, e)
|
||||
raise UnresolvableRecordError(owner=owner, rtype=rtype, ip=ip_addr,
|
||||
error=e)
|
||||
|
||||
try:
|
||||
_resolve_record(owner, rtype, nameserver_ip=ip_addr, edns0=True,
|
||||
timeout=timeout)
|
||||
except DNSException as e:
|
||||
_log_response(log, e)
|
||||
raise EDNS0UnsupportedError(owner=owner, rtype=rtype, ip=ip_addr,
|
||||
error=e)
|
||||
|
||||
|
||||
def validate_dnssec_global_forwarder(ip_addr, log=None, timeout=10):
|
||||
"""Test DNS forwarder properties. against root zone.
|
||||
|
||||
Global forwarders should be able return signed root zone
|
||||
|
||||
:raise UnresolvableRecordError: record cannot be resolved
|
||||
:raise EDNS0UnsupportedError: EDNS0 is not supported by forwarder
|
||||
:raise DNSSECSignatureMissingError: did not receive RRSIG for root zone
|
||||
"""
|
||||
|
||||
ip_addr = str(ip_addr)
|
||||
owner = "."
|
||||
rtype = "SOA"
|
||||
|
||||
_validate_edns0_forwarder(owner, rtype, ip_addr, log=log, timeout=timeout)
|
||||
|
||||
# DNS root has to be signed
|
||||
try:
|
||||
ans = res.query('.', 'NS')
|
||||
except DNSException:
|
||||
return None
|
||||
ans = _resolve_record(owner, rtype, nameserver_ip=ip_addr, dnssec=True,
|
||||
timeout=timeout)
|
||||
except DNSException as e:
|
||||
_log_response(log, e)
|
||||
raise UnresolvableRecordError(owner=owner, rtype=rtype, ip=ip_addr,
|
||||
error=e)
|
||||
|
||||
try:
|
||||
ans.response.find_rrset(ans.response.answer, dns.name.root,
|
||||
dns.rdataclass.IN, dns.rdatatype.RRSIG, dns.rdatatype.NS)
|
||||
ans.response.find_rrset(
|
||||
ans.response.answer, dns.name.root, dns.rdataclass.IN,
|
||||
dns.rdatatype.RRSIG, dns.rdatatype.SOA
|
||||
)
|
||||
except KeyError:
|
||||
return False
|
||||
raise DNSSECSignatureMissingError(owner=owner, rtype=rtype, ip=ip_addr)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def validate_idna_domain(value):
|
||||
|
||||
Reference in New Issue
Block a user