dns: move code called on client to the module level

Move DNSRecord and dnsrecord code called on client to module-level
functions.

This will make it possible to move the code to ipaclient without the
DNSRecord and dnsrecord class bits.

https://fedorahosted.org/freeipa/ticket/4739

Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
Jan Cholasta
2016-04-27 13:29:34 +02:00
parent 1bcf08aab6
commit b6af621432

View File

@@ -665,6 +665,121 @@ def _check_DN_objectclass(ldap, dn, objectclasses):
return _check_entry_objectclass(entry, objectclasses)
def __get_part_param(param, cmd, part, output_kw, default=None):
name = param.part_name_format % (param.rrtype.lower(), part.name)
label = param.part_label_format % (param.rrtype, unicode(part.label))
optional = not part.required
output_kw[name] = cmd.prompt_param(part,
optional=optional,
label=label)
def prompt_parts(param, cmd, mod_dnsvalue=None):
mod_parts = None
if mod_dnsvalue is not None:
mod_parts = param._get_part_values(mod_dnsvalue)
user_options = {}
if param.parts is None:
return user_options
for part_id, part in enumerate(param.parts):
if mod_parts:
default = mod_parts[part_id]
else:
default = None
__get_part_param(param, cmd, part, user_options, default)
return user_options
def prompt_missing_parts(param, cmd, kw, prompt_optional=False):
user_options = {}
if param.parts is None:
return user_options
for part in param.parts:
name = param.part_name_format % (param.rrtype.lower(), part.name)
if name in kw:
continue
optional = not part.required
if optional and not prompt_optional:
continue
default = part.get_default(**kw)
__get_part_param(param, cmd, part, user_options, default)
return user_options
def has_cli_options(cmd, options, no_option_msg, allow_empty_attrs=False):
sufficient = ('setattr', 'addattr', 'delattr', 'rename')
if any(k in options for k in sufficient):
return
has_options = False
for attr in options.keys():
if (attr in cmd.obj.params and
not cmd.obj.params[attr].primary_key):
if options[attr] or allow_empty_attrs:
has_options = True
break
if not has_options:
raise errors.OptionError(no_option_msg)
def get_rrparam_from_part(cmd, part_name):
"""
Get an instance of DNSRecord parameter that has part_name as its part.
If such parameter is not found, None is returned
:param part_name Part parameter name
"""
try:
param = cmd.obj.params[part_name]
if not any(flag in param.flags for flag in
('dnsrecord_part', 'dnsrecord_extra')):
return None
# All DNS record part or extra parameters contain a name of its
# parent RR parameter in its hint attribute
rrparam = cmd.obj.params[param.hint]
except (KeyError, AttributeError):
return None
return rrparam
def iterate_rrparams_by_parts(cmd, kw, skip_extra=False):
"""
Iterates through all DNSRecord instances that has at least one of its
parts or extra options in given dictionary. It returns the DNSRecord
instance only for the first occurence of part/extra option.
:param kw Dictionary with DNS record parts or extra options
:param skip_extra Skip DNS record extra options, yield only DNS records
with a real record part
"""
processed = []
for opt in kw:
rrparam = get_rrparam_from_part(cmd, opt)
if rrparam is None:
continue
if skip_extra and 'dnsrecord_extra' in cmd.obj.params[opt].flags:
continue
if rrparam.name not in processed:
processed.append(rrparam.name)
yield rrparam
class DNSRecord(Str):
# a list of parts that create the actual raw DNS record
parts = None
@@ -883,54 +998,6 @@ class DNSRecord(Str):
return tuple(self._convert_dnsrecord_extra(extra) for extra in self.extra)
def __get_part_param(self, cmd, part, output_kw, default=None):
name = self.part_name_format % (self.rrtype.lower(), part.name)
label = self.part_label_format % (self.rrtype, unicode(part.label))
optional = not part.required
output_kw[name] = cmd.prompt_param(part,
optional=optional,
label=label)
def prompt_parts(self, cmd, mod_dnsvalue=None):
mod_parts = None
if mod_dnsvalue is not None:
mod_parts = self._get_part_values(mod_dnsvalue)
user_options = {}
if self.parts is None:
return user_options
for part_id, part in enumerate(self.parts):
if mod_parts:
default = mod_parts[part_id]
else:
default = None
self.__get_part_param(cmd, part, user_options, default)
return user_options
def prompt_missing_parts(self, cmd, kw, prompt_optional=False):
user_options = {}
if self.parts is None:
return user_options
for part in self.parts:
name = self.part_name_format % (self.rrtype.lower(), part.name)
if name in kw:
continue
optional = not part.required
if optional and not prompt_optional:
continue
default = part.get_default(**kw)
self.__get_part_param(cmd, part, user_options, default)
return user_options
# callbacks for per-type special record behavior
def dnsrecord_add_pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)
@@ -3187,20 +3254,6 @@ class dnsrecord(LDAPObject):
return dns_masters
def has_cli_options(self, options, no_option_msg, allow_empty_attrs=False):
if any(k in options for k in ('setattr', 'addattr', 'delattr', 'rename')):
return
has_options = False
for attr in options.keys():
if attr in self.params and not self.params[attr].primary_key:
if options[attr] or allow_empty_attrs:
has_options = True
break
if not has_options:
raise errors.OptionError(no_option_msg)
def get_record_entry_attrs(self, entry_attrs):
entry_attrs = entry_attrs.copy()
for attr in entry_attrs.keys():
@@ -3246,51 +3299,6 @@ class dnsrecord(LDAPObject):
#Decode IDN ACE form to Unicode, raw records are passed directly from LDAP
_records_idn_postprocess(record, **options)
def get_rrparam_from_part(self, part_name):
"""
Get an instance of DNSRecord parameter that has part_name as its part.
If such parameter is not found, None is returned
:param part_name Part parameter name
"""
try:
param = self.params[part_name]
if not any(flag in param.flags for flag in \
('dnsrecord_part', 'dnsrecord_extra')):
return None
# All DNS record part or extra parameters contain a name of its
# parent RR parameter in its hint attribute
rrparam = self.params[param.hint]
except (KeyError, AttributeError):
return None
return rrparam
def iterate_rrparams_by_parts(self, kw, skip_extra=False):
"""
Iterates through all DNSRecord instances that has at least one of its
parts or extra options in given dictionary. It returns the DNSRecord
instance only for the first occurence of part/extra option.
:param kw Dictionary with DNS record parts or extra options
:param skip_extra Skip DNS record extra options, yield only DNS records
with a real record part
"""
processed = []
for opt in kw:
rrparam = self.get_rrparam_from_part(opt)
if rrparam is None:
continue
if skip_extra and 'dnsrecord_extra' in self.params[opt].flags:
continue
if rrparam.name not in processed:
processed.append(rrparam.name)
yield rrparam
def updated_rrattrs(self, old_entry, entry_attrs):
"""Returns updated RR attributes
"""
@@ -3590,12 +3598,12 @@ class dnsrecord_add(LDAPCreate):
)
def args_options_2_entry(self, *keys, **options):
self.obj.has_cli_options(options, self.no_option_msg)
has_cli_options(self, options, self.no_option_msg)
return super(dnsrecord_add, self).args_options_2_entry(*keys, **options)
def interactive_prompt_callback(self, kw):
try:
self.obj.has_cli_options(kw, self.no_option_msg)
has_cli_options(self, kw, self.no_option_msg)
# Some DNS records were entered, do not use full interactive help
# We should still ask user for required parts of DNS parts he is
@@ -3604,9 +3612,10 @@ class dnsrecord_add(LDAPCreate):
# Do not ask for required parts when any "extra" option is used,
# it can be used to fill all required params by itself
new_kw = {}
for rrparam in self.obj.iterate_rrparams_by_parts(kw, skip_extra=True):
user_options = rrparam.prompt_missing_parts(self, kw,
prompt_optional=False)
for rrparam in iterate_rrparams_by_parts(self, kw,
skip_extra=True):
user_options = prompt_missing_parts(rrparam, self, kw,
prompt_optional=False)
new_kw.update(user_options)
kw.update(new_kw)
return
@@ -3657,7 +3666,7 @@ class dnsrecord_add(LDAPCreate):
continue
ok = True
user_options = param.prompt_parts(self)
user_options = prompt_parts(param, self)
kw.update(user_options)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
@@ -3670,7 +3679,7 @@ class dnsrecord_add(LDAPCreate):
except KeyError:
continue
rrparam = self.obj.get_rrparam_from_part(option)
rrparam = get_rrparam_from_part(self, option)
if rrparam is None:
continue
@@ -3799,7 +3808,7 @@ class dnsrecord_mod(LDAPUpdate):
)
def args_options_2_entry(self, *keys, **options):
self.obj.has_cli_options(options, self.no_option_msg, True)
has_cli_options(self, options, self.no_option_msg, True)
return super(dnsrecord_mod, self).args_options_2_entry(*keys, **options)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
@@ -3812,7 +3821,7 @@ class dnsrecord_mod(LDAPUpdate):
# check if any attr should be updated using structured instead of replaced
# format is recordname : (old_value, new_parts)
updated_attrs = {}
for param in self.obj.iterate_rrparams_by_parts(options, skip_extra=True):
for param in iterate_rrparams_by_parts(self, options, skip_extra=True):
parts = param.get_parts_from_kw(options, raise_on_none=False)
if parts is None:
@@ -3916,7 +3925,7 @@ class dnsrecord_mod(LDAPUpdate):
def interactive_prompt_callback(self, kw):
try:
self.obj.has_cli_options(kw, self.no_option_msg, True)
has_cli_options(self, kw, self.no_option_msg, True)
except errors.OptionError:
pass
else:
@@ -3955,7 +3964,8 @@ class dnsrecord_mod(LDAPUpdate):
mod_value = self.Backend.textui.prompt_yesno(
_("Modify %(name)s '%(value)s'?") % dict(name=param.label, value=rec_value), default=False)
if mod_value is True:
user_options = param.prompt_parts(self, mod_dnsvalue=rec_value)
user_options = prompt_parts(param, self,
mod_dnsvalue=rec_value)
kw[param.name] = [rec_value]
kw.update(user_options)
@@ -4097,14 +4107,14 @@ class dnsrecord_del(LDAPUpdate):
return dn
def args_options_2_entry(self, *keys, **options):
self.obj.has_cli_options(options, self.no_option_msg)
has_cli_options(self, options, self.no_option_msg)
return super(dnsrecord_del, self).args_options_2_entry(*keys, **options)
def interactive_prompt_callback(self, kw):
if kw.get('del_all', False):
return
try:
self.obj.has_cli_options(kw, self.no_option_msg)
has_cli_options(self, kw, self.no_option_msg)
except errors.OptionError:
pass
else: