Prevent commands to modify different type of a zone

Commands dnsforwardzone-* can modify only forward zones
Commands dnszone-* can modify only (master) zones
Commands dnsrecord-* can work only with master zones

design: http://www.freeipa.org/page/V4/Forward_zones

Ticket: https://fedorahosted.org/freeipa/ticket/3210
Reviewed-By: Petr Vobornik <pvoborni@redhat.com>
This commit is contained in:
Martin Basti 2014-05-23 09:03:34 +02:00 committed by Petr Vobornik
parent 49068ade92
commit 266015c3e2

View File

@ -549,6 +549,31 @@ def _dns_name_to_string(value, raw=False):
else:
return unicode(value)
def _check_entry_objectclass(entry, objectclasses):
"""
Check if entry contains all objectclasses
"""
if not isinstance(objectclasses, (list, tuple)):
objectclasses = [objectclasses, ]
if not entry.get('objectclass'):
return False
entry_objectclasses = [o.lower() for o in entry['objectclass']]
for o in objectclasses:
if o not in entry_objectclasses:
return False
return True
def _check_DN_objectclass(ldap, dn, objectclasses):
try:
entry = ldap.get_entry(dn, [u'objectclass', ])
except Exception:
return False
else:
return _check_entry_objectclass(entry, objectclasses)
class DNSRecord(Str):
# a list of parts that create the actual raw DNS record
parts = None
@ -2014,6 +2039,18 @@ class dnszone_add(LDAPCreate):
if not dns_container_exists(self.api.Backend.ldap2):
raise errors.NotFound(reason=_('DNS is not configured'))
try:
entry = ldap.get_entry(dn)
except errors.NotFound:
pass
else:
if _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_duplicate_entry(*keys)
else:
raise errors.DuplicateEntry(
message=_(u'Only one zone type is allowed per zone name')
)
entry_attrs['idnszoneactive'] = 'TRUE'
# Check nameserver has a forward record
@ -2090,6 +2127,12 @@ class dnszone_del(LDAPDelete):
msg_summary = _('Deleted DNS zone "%(value)s"')
def pre_callback(self, ldap, dn, *nkeys, **options):
assert isinstance(dn, DN)
if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
self.obj.handle_not_found(*nkeys)
return dn
def post_callback(self, ldap, dn, *keys, **options):
try:
api.Command['permission_del'](self.obj.permission_name(keys[-1]),
@ -2127,6 +2170,8 @@ class dnszone_mod(LDAPUpdate):
has_output_params = LDAPUpdate.has_output_params + dnszone_output_params
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
self.obj.handle_not_found(*keys)
nameserver = entry_attrs.get('idnssoamname')
if nameserver and not nameserver.is_empty() and not options['force']:
check_ns_rec_resolvable(keys[0], nameserver)
@ -2208,6 +2253,12 @@ class dnszone_show(LDAPRetrieve):
has_output_params = LDAPRetrieve.has_output_params + dnszone_output_params
def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
assert isinstance(dn, DN)
if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
self.obj.handle_not_found(*keys)
return dn
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN)
self.obj._rr_zone_postprocess(entry_attrs, **options)
@ -2226,7 +2277,9 @@ class dnszone_disable(LDAPQuery):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
entry = ldap.get_entry(dn, ['idnszoneactive'])
entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
entry['idnszoneactive'] = ['FALSE']
@ -2250,7 +2303,9 @@ class dnszone_enable(LDAPQuery):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
entry = ldap.get_entry(dn, ['idnszoneactive'])
entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
entry['idnszoneactive'] = ['TRUE']
@ -2277,6 +2332,9 @@ class dnszone_add_permission(LDAPQuery):
entry_attrs = ldap.get_entry(dn, ['objectclass'])
except errors.NotFound:
self.obj.handle_not_found(*keys)
else:
if not _check_entry_objectclass(entry_attrs, self.obj.object_class):
self.obj.handle_not_found(*keys)
permission_name = self.obj.permission_name(keys[-1])
permission = api.Command['permission_add_noaci'](permission_name,
@ -2311,9 +2369,12 @@ class dnszone_remove_permission(LDAPQuery):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
try:
entry = ldap.get_entry(dn, ['managedby'])
entry = ldap.get_entry(dn, ['managedby', 'objectclass'])
except errors.NotFound:
self.obj.handle_not_found(*keys)
else:
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
entry['managedby'] = None
@ -2457,17 +2518,33 @@ class dnsrecord(LDAPObject):
return True
return False
def check_zone(self, zone, **options):
"""
Check if zone exists and if is master zone
"""
parent_object = self.api.Object[self.parent_object]
dn = parent_object.get_dn(zone, **options)
ldap = self.api.Backend.ldap2
try:
entry = ldap.get_entry(dn, ['objectclass'])
except errors.NotFound:
parent_object.handle_not_found(zone)
else:
# only master zones can contain records
if 'idnszone' not in [x.lower() for x in entry.get('objectclass', [])]:
raise errors.ValidationError(
name='dnszoneidnsname',
error=_(u'only master zones can contain records')
)
return dn
def get_dn(self, *keys, **options):
dn = self.check_zone(keys[-2])
if self.is_pkey_zone_record(*keys):
parent_object = self.api.Object[self.parent_object]
dn = parent_object.get_dn(*keys[:-1], **options)
# zone must exist
ldap = self.api.Backend.ldap2
try:
ldap.get_entry(dn, [])
except errors.NotFound:
parent_object.handle_not_found(*keys[:-1])
return dn
#Make RR name relative if possible
relative_name = keys[-1].relativize(keys[-2]).ToASCII()
keys = keys[:-1] + (relative_name,)
@ -3419,6 +3496,9 @@ class dnsrecord_find(LDAPSearch):
def pre_callback(self, ldap, filter, attrs_list, base_dn, scope, *args, **options):
assert isinstance(base_dn, DN)
# validate if zone is master zone
self.obj.check_zone(args[-2], **options)
filter = _create_idn_filter(self, ldap, *args, **options)
return (filter, base_dn, ldap.SCOPE_SUBTREE)
@ -3685,6 +3765,18 @@ class dnsforwardzone_add(LDAPCreate):
if not dns_container_exists(self.api.Backend.ldap2):
raise errors.NotFound(reason=_('DNS is not configured'))
try:
entry = ldap.get_entry(dn)
except errors.NotFound:
pass
else:
if _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_duplicate_entry(*keys)
else:
raise errors.DuplicateEntry(
message=_(u'Only one zone type is allowed per zone name')
)
entry_attrs['idnszoneactive'] = 'TRUE'
if 'idnsforwardpolicy' not in entry_attrs:
@ -3708,6 +3800,12 @@ class dnsforwardzone_del(LDAPDelete):
msg_summary = _('Deleted DNS forward zone "%(value)s"')
def pre_callback(self, ldap, dn, *nkeys, **options):
assert isinstance(dn, DN)
if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
self.obj.handle_not_found(*nkeys)
return dn
def post_callback(self, ldap, dn, *keys, **options):
try:
api.Command['permission_del'](self.obj.permission_name(keys[-1]),
@ -3730,6 +3828,9 @@ class dnsforwardzone_mod(LDAPUpdate):
except errors.NotFound:
self.obj.handle_not_found(*keys)
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
policy = self.obj.default_forward_policy
forwarders = []
@ -3799,8 +3900,10 @@ class dnsforwardzone_show(LDAPRetrieve):
has_output_params = LDAPRetrieve.has_output_params + dnszone_output_params
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
assert isinstance(dn, DN)
if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
self.obj.handle_not_found(*keys)
return dn
@ -3815,7 +3918,9 @@ class dnsforwardzone_disable(LDAPQuery):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
entry = ldap.get_entry(dn, ['idnszoneactive'])
entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
entry['idnszoneactive'] = ['FALSE']
@ -3838,8 +3943,9 @@ class dnsforwardzone_enable(LDAPQuery):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
entry = ldap.get_entry(dn, ['idnszoneactive'])
entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
entry['idnszoneactive'] = ['TRUE']
try:
@ -3865,6 +3971,9 @@ class dnsforwardzone_add_permission(LDAPQuery):
entry_attrs = ldap.get_entry(dn, ['objectclass'])
except errors.NotFound:
self.obj.handle_not_found(*keys)
else:
if not _check_entry_objectclass(entry_attrs, self.obj.object_class):
self.obj.handle_not_found(*keys)
permission_name = self.obj.permission_name(keys[-1])
permission = api.Command['permission_add_noaci'](permission_name,
@ -3899,9 +4008,12 @@ class dnsforwardzone_remove_permission(LDAPQuery):
ldap = self.obj.backend
dn = self.obj.get_dn(*keys, **options)
try:
entry = ldap.get_entry(dn, ['managedby'])
entry = ldap.get_entry(dn, ['managedby', 'objectclass'])
except errors.NotFound:
self.obj.handle_not_found(*keys)
else:
if not _check_entry_objectclass(entry, self.obj.object_class):
self.obj.handle_not_found(*keys)
entry['managedby'] = None