DNSSEC: fix DS record validation

Part of: https://fedorahosted.org/freeipa/ticket/3801

Reviewed-By: Petr Spacek <pspacek@redhat.com>
Reviewed-By: Martin Kosek <mkosek@redhat.com>
This commit is contained in:
Martin Basti 2014-08-20 18:51:25 +02:00 committed by Martin Kosek
parent 78b2a7abbb
commit 734883282e

View File

@ -2601,6 +2601,14 @@ class dnsrecord(LDAPObject):
doc=_('Parse all raw DNS records and return them in a structured way'),
)
def _dsrecord_pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN)
dsrecords = entry_attrs.get('dsrecord')
if dsrecords and self.is_pkey_zone_record(*keys):
raise errors.ValidationError(
name='dsrecord',
error=unicode(_('DS record must not be in zone apex (RFC 4035 section 2.4)')))
def _nsrecord_pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN)
nsrecords = entry_attrs.get('nsrecord')
@ -2846,8 +2854,9 @@ class dnsrecord(LDAPObject):
processed.append(rrparam.name)
yield rrparam
def check_record_type_collisions(self, keys, old_entry, entry_attrs):
# Test that only allowed combination of record types was created
def updated_rrattrs(self, old_entry, entry_attrs):
"""Returns updated RR attributes
"""
rrattrs = {}
if old_entry is not None:
old_rrattrs = dict((key, value) for key, value in old_entry.iteritems()
@ -2858,13 +2867,13 @@ class dnsrecord(LDAPObject):
if key in self.params and
isinstance(self.params[key], DNSRecord))
rrattrs.update(new_rrattrs)
return rrattrs
def check_record_type_collisions(self, keys, rrattrs):
# Test that only allowed combination of record types was created
# CNAME record validation
try:
cnames = rrattrs['cnamerecord']
except KeyError:
pass
else:
cnames = rrattrs.get('cnamerecord')
if cnames is not None:
if len(cnames) > 1:
raise errors.ValidationError(name='cnamerecord',
@ -2878,11 +2887,7 @@ class dnsrecord(LDAPObject):
'with any other record (RFC 1034, section 3.6.2)'))
# DNAME record validation
try:
dnames = rrattrs['dnamerecord']
except KeyError:
pass
else:
dnames = rrattrs.get('dnamerecord')
if dnames is not None:
if len(dnames) > 1:
raise errors.ValidationError(name='dnamerecord',
@ -2895,6 +2900,21 @@ class dnsrecord(LDAPObject):
'NS record except when located in a zone root '
'record (RFC 6672, section 2.3)'))
def check_record_type_dependencies(self, keys, rrattrs):
# Test that all record type dependencies are satisfied
# DS record validation
# DS record requires to coexists with NS record
dsrecords = rrattrs.get('dsrecord')
nsrecords = rrattrs.get('nsrecord')
# DS record cannot be in zone apex, checked in pre-callback validators
if dsrecords and not nsrecords:
raise errors.ValidationError(
name='dsrecord',
error=_('DS record requires to coexist with an '
'NS record (RFC 4592 section 4.6, RFC 4035 section 2.4)'))
def _entry2rrsets(self, entry_attrs, dns_name, dns_domain):
'''Convert entry_attrs to a dictionary {rdtype: rrset}.
@ -3238,7 +3258,9 @@ class dnsrecord_add(LDAPCreate):
vals = list(entry_attrs[attr])
entry_attrs[attr] = list(set(old_entry.get(attr, []) + vals))
self.obj.check_record_type_collisions(keys, old_entry, entry_attrs)
rrattrs = self.obj.updated_rrattrs(old_entry, entry_attrs)
self.obj.check_record_type_dependencies(keys, rrattrs)
self.obj.check_record_type_collisions(keys, rrattrs)
context.dnsrecord_entry_mods = getattr(context, 'dnsrecord_entry_mods',
{})
context.dnsrecord_entry_mods[(keys[0], keys[1])] = entry_attrs.copy()
@ -3346,7 +3368,9 @@ class dnsrecord_mod(LDAPUpdate):
new_dnsvalue = [param._convert_scalar(modified_parts)]
entry_attrs[attr] = list(set(old_entry[attr] + new_dnsvalue))
self.obj.check_record_type_collisions(keys, old_entry, entry_attrs)
rrattrs = self.obj.updated_rrattrs(old_entry, entry_attrs)
self.obj.check_record_type_dependencies(keys, rrattrs)
self.obj.check_record_type_collisions(keys, rrattrs)
context.dnsrecord_entry_mods = getattr(context, 'dnsrecord_entry_mods',
{})
@ -3518,6 +3542,9 @@ class dnsrecord_del(LDAPUpdate):
raise errors.AttrValueNotFound(attr=attr_name, value=val)
entry_attrs[attr] = list(set(old_entry[attr]))
rrattrs = self.obj.updated_rrattrs(old_entry, entry_attrs)
self.obj.check_record_type_dependencies(keys, rrattrs)
del_all = False
if not self.obj.is_pkey_zone_record(*keys):
record_found = False