Fix precallback validators in DNS plugin

DNS plugin contains several RR type record validators run in
pre_callback which cannot be used as standard param validator
as it needs more data and resources that standard validators
provide. However, the precallback validators are not run for
DNS records created by new structured options and thus an invalid
value may slip in.

This patch moves the execution of these precallback validators
_after_ the processing of structured DNS options. It also cleans
them up a little and makes them more robust.

https://fedorahosted.org/freeipa/ticket/2550
This commit is contained in:
Martin Kosek
2012-03-21 13:25:42 +01:00
parent a58cbb985e
commit 19b2af8e52
2 changed files with 59 additions and 28 deletions

View File

@@ -1481,7 +1481,7 @@ def zone_is_reverse(zone_name):
return False return False
def is_ns_rec_resolvable(name): def check_ns_rec_resolvable(name):
try: try:
return api.Command['dns_resolve'](name) return api.Command['dns_resolve'](name)
except errors.NotFound: except errors.NotFound:
@@ -1708,7 +1708,7 @@ class dnszone_add(LDAPCreate):
error=unicode(_("Nameserver address is not a fully qualified domain name"))) error=unicode(_("Nameserver address is not a fully qualified domain name")))
if not 'ip_address' in options and not options['force']: if not 'ip_address' in options and not options['force']:
is_ns_rec_resolvable(nameserver) check_ns_rec_resolvable(nameserver)
if nameserver[-1] != '.': if nameserver[-1] != '.':
nameserver += '.' nameserver += '.'
@@ -1877,17 +1877,20 @@ class dnsrecord(LDAPObject):
) )
def _nsrecord_pre_callback(self, ldap, dn, entry_attrs, *keys, **options): def _nsrecord_pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
if options.get('force', False): nsrecords = entry_attrs.get('nsrecord')
return dn if options.get('force', False) or nsrecords is None:
return
for ns in options['nsrecord']: map(check_ns_rec_resolvable, nsrecords)
is_ns_rec_resolvable(ns)
return dn
def _ptrrecord_pre_callback(self, ldap, dn, entry_attrs, *keys, **options): def _ptrrecord_pre_callback(self, ldap, dn, entry_attrs, *keys, **options):
components = dn.split(',',2) ptrrecords = entry_attrs.get('ptrrecord')
addr = components[0].split('=')[1] if ptrrecords is None:
zone = components[1].split('=')[1] return
zone = keys[-2]
if self.is_pkey_zone_record(*keys):
addr = u''
else:
addr = keys[-1]
zone_len = 0 zone_len = 0
for valid_zone in _valid_reverse_zones: for valid_zone in _valid_reverse_zones:
if zone.find(valid_zone) != -1: if zone.find(valid_zone) != -1:
@@ -1897,16 +1900,23 @@ class dnsrecord(LDAPObject):
if not zone_len: if not zone_len:
allowed_zones = ', '.join(_valid_reverse_zones) allowed_zones = ', '.join(_valid_reverse_zones)
raise errors.ValidationError(name='cn', raise errors.ValidationError(name='ptrrecord',
error=unicode(_('Reverse zone for PTR record should be a sub-zone of one the following fully qualified domains: %s') % allowed_zones)) error=unicode(_('Reverse zone for PTR record should be a sub-zone of one the following fully qualified domains: %s') % allowed_zones))
ip_addr_comp_count = len(addr.split('.')) + len(zone.split('.')) addr_len = len(addr.split('.')) if addr else 0
ip_addr_comp_count = addr_len + len(zone.split('.'))
if ip_addr_comp_count != zone_len: if ip_addr_comp_count != zone_len:
raise errors.ValidationError(name='cn', raise errors.ValidationError(name='ptrrecord',
error=unicode(_('Reverse zone %(name)s requires exactly %(count)d IP address components, %(user_count)d given') error=unicode(_('Reverse zone %(name)s requires exactly %(count)d IP address components, %(user_count)d given')
% dict(name=zone_name, count=zone_len, user_count=ip_addr_comp_count))) % dict(name=zone_name, count=zone_len, user_count=ip_addr_comp_count)))
return dn def run_precallback_validators(self, dn, entry_attrs, *keys, **options):
ldap = self.api.Backend.ldap2
for rtype in entry_attrs:
rtype_cb = getattr(self, '_%s_pre_callback' % rtype, None)
if rtype_cb:
rtype_cb(ldap, dn, entry_attrs, *keys, **options)
def is_pkey_zone_record(self, *keys): def is_pkey_zone_record(self, *keys):
idnsname = keys[-1] idnsname = keys[-1]
@@ -2120,11 +2130,6 @@ class dnsrecord_add(LDAPCreate):
kw.update(user_options) kw.update(user_options)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options): def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
for rtype in options:
rtype_cb = '_%s_pre_callback' % rtype
if hasattr(self.obj, rtype_cb):
dn = getattr(self.obj, rtype_cb)(ldap, dn, entry_attrs, *keys, **options)
precallback_attrs = [] precallback_attrs = []
for option in options: for option in options:
try: try:
@@ -2153,6 +2158,9 @@ class dnsrecord_add(LDAPCreate):
# extra option is passed, run per-type pre_callback for given RR type # extra option is passed, run per-type pre_callback for given RR type
precallback_attrs.append(rrparam.name) precallback_attrs.append(rrparam.name)
# Run pre_callback validators
self.obj.run_precallback_validators(dn, entry_attrs, *keys, **options)
# run precallback also for all new RR type attributes in entry_attrs # run precallback also for all new RR type attributes in entry_attrs
for attr in entry_attrs: for attr in entry_attrs:
try: try:
@@ -2232,14 +2240,7 @@ class dnsrecord_mod(LDAPUpdate):
self.obj.has_cli_options(options, self.no_option_msg, True) self.obj.has_cli_options(options, self.no_option_msg, True)
return super(dnsrecord_mod, self).args_options_2_entry(*keys, **options) return super(dnsrecord_mod, self).args_options_2_entry(*keys, **options)
def pre_callback(self, ldap, dn, entry_attrs, *keys, **options): def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
for rtype in options:
rtype_cb = '_%s_pre_callback' % rtype
if options[rtype] is None and rtype in _record_attributes:
options[rtype] = []
if hasattr(self.obj, rtype_cb):
dn = getattr(self.obj, rtype_cb)(ldap, dn, entry_attrs, *keys, **options)
# check if any attr should be updated using structured instead of replaced # check if any attr should be updated using structured instead of replaced
# format is recordname : (old_value, new_parts) # format is recordname : (old_value, new_parts)
updated_attrs = {} updated_attrs = {}
@@ -2264,6 +2265,9 @@ class dnsrecord_mod(LDAPUpdate):
updated_attrs[attr] = (old_value, parts) updated_attrs[attr] = (old_value, parts)
# Run pre_callback validators
self.obj.run_precallback_validators(dn, entry_attrs, *keys, **options)
if len(updated_attrs): if len(updated_attrs):
try: try:
(dn_, old_entry) = ldap.get_entry( (dn_, old_entry) = ldap.get_entry(

View File

@@ -743,6 +743,33 @@ class test_dns(Declarative):
}, },
), ),
dict(
desc='Try to add unresolvable NS record to %r using dnsrecord_add' % (dnsres1),
command=('dnsrecord_add', [dnszone1, dnsres1], {'nsrecord': u'does.not.exist'}),
expected=errors.NotFound(reason=u"Nameserver 'does.not.exist' does not have a corresponding A/AAAA record"),
),
dict(
desc='Add unresolvable NS record with --force to %r using dnsrecord_add' % (dnsres1),
command=('dnsrecord_add', [dnszone1, dnsres1], {'nsrecord': u'does.not.exist',
'force' : True}),
expected={
'value': dnsres1,
'summary': None,
'result': {
'objectclass': [u'top', u'idnsrecord'],
'dn': unicode(dnsres1_dn),
'idnsname': [dnsres1],
'arecord': [u'10.10.0.1'],
'cnamerecord': [u'foo-1.example.com.'],
'kxrecord': [u'1 foo-1'],
'txtrecord': [u'foo bar'],
'nsecrecord': [dnszone1 + u' TXT A'],
'nsrecord': [u'does.not.exist'],
},
},
),
dict( dict(
desc='Delete record %r in zone %r' % (dnsres1, dnszone1), desc='Delete record %r in zone %r' % (dnsres1, dnszone1),
command=('dnsrecord_del', [dnszone1, dnsres1], {'del_all': True }), command=('dnsrecord_del', [dnszone1, dnsres1], {'del_all': True }),