ipalib: split off client-side plugin code into ipaclient

Provide client-side overrides for command plugins which implement any of
the client-side `interactive_prompt_callback`, `forward` or
`output_for_cli` methods and move the methods from the original plugins to
the overrides.

https://fedorahosted.org/freeipa/ticket/4739

Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
Jan Cholasta 2016-04-28 10:15:01 +02:00
parent 6cfb9d73d9
commit 4c7be74526
34 changed files with 1198 additions and 751 deletions

View File

@ -22,11 +22,13 @@ import os
import six
from ipaclient.frontend import MethodOverride
from ipalib import api, errors
from ipalib import Flag, Str
from ipalib.frontend import Command
from ipalib.plugable import Registry
from ipalib import _
from ipapython.dn import DN
if six.PY3:
unicode = str
@ -37,6 +39,57 @@ DEFAULT_MAPS = (u'auto.direct', )
DEFAULT_KEYS = (u'/-', )
@register(override=True)
class automountlocation_tofiles(MethodOverride):
def output_for_cli(self, textui, result, *keys, **options):
maps = result['result']['maps']
keys = result['result']['keys']
orphanmaps = result['result']['orphanmaps']
orphankeys = result['result']['orphankeys']
textui.print_plain('/etc/auto.master:')
for m in maps:
if m['automountinformation'][0].startswith('-'):
textui.print_plain(
'%s\t%s' % (
m['automountkey'][0], m['automountinformation'][0]
)
)
else:
textui.print_plain(
'%s\t/etc/%s' % (
m['automountkey'][0], m['automountinformation'][0]
)
)
for m in maps:
if m['automountinformation'][0].startswith('-'):
continue
info = m['automountinformation'][0]
textui.print_plain('---------------------------')
textui.print_plain('/etc/%s:' % info)
for k in keys[info]:
textui.print_plain(
'%s\t%s' % (
k['automountkey'][0], k['automountinformation'][0]
)
)
textui.print_plain('')
textui.print_plain(_('maps not connected to /etc/auto.master:'))
for m in orphanmaps:
textui.print_plain('---------------------------')
textui.print_plain('/etc/%s:' % m['automountmapname'])
for k in orphankeys:
if len(k) == 0: continue
dn = DN(k[0]['dn'])
if dn['automountmapname'] == m['automountmapname'][0]:
textui.print_plain(
'%s\t%s' % (
k[0]['automountkey'][0], k[0]['automountinformation'][0]
)
)
@register()
class automountlocation_import(Command):
__doc__ = _('Import automount files for a specific location.')

43
ipaclient/plugins/cert.py Normal file
View File

@ -0,0 +1,43 @@
# Authors:
# Andrew Wnuk <awnuk@redhat.com>
# Jason Gerard DeRose <jderose@redhat.com>
# John Dennis <jdennis@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import CommandOverride
from ipalib import errors
from ipalib import x509
from ipalib import util
from ipalib.plugable import Registry
register = Registry()
@register(override=True)
class cert_show(CommandOverride):
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(cert_show, self).forward(*keys, **options)
if 'certificate' in result['result']:
x509.write_certificate(result['result']['certificate'], options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(cert_show, self).forward(*keys, **options)

View File

@ -0,0 +1,28 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
from ipaclient.frontend import MethodOverride
from ipalib import util
from ipalib.plugable import Registry
from ipalib.text import _
register = Registry()
@register(override=True)
class certprofile_show(MethodOverride):
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(certprofile_show, self).forward(*keys, **options)
if 'out' in options and 'config' in result['result']:
with open(options['out'], 'wb') as f:
f.write(result['result'].pop('config'))
result['summary'] = (
_("Profile configuration stored in file '%(file)s'")
% dict(file=options['out'])
)
return result

325
ipaclient/plugins/dns.py Normal file
View File

@ -0,0 +1,325 @@
# Authors:
# Martin Kosek <mkosek@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import six
from ipaclient.frontend import MethodOverride
from ipalib import errors
from ipalib.dns import (get_record_rrtype,
has_cli_options,
iterate_rrparams_by_parts,
record_name_format)
from ipalib.plugable import Registry
from ipalib import _, ngettext
from ipapython.dnsutil import DNSName
if six.PY3:
unicode = str
register = Registry()
# most used record types, always ask for those in interactive prompt
_top_record_types = ('A', 'AAAA', )
_rev_top_record_types = ('PTR', )
_zone_top_record_types = ('NS', 'MX', 'LOC', )
def __get_part_param(cmd, part, output_kw, default=None):
name = part.name
label = unicode(part.label)
optional = not part.required
output_kw[name] = cmd.prompt_param(part,
optional=optional,
label=label)
def prompt_parts(rrtype, cmd, mod_dnsvalue=None):
mod_parts = None
if mod_dnsvalue is not None:
name = record_name_format % rrtype.lower()
mod_parts = cmd.api.Command.dnsrecord_split_parts(
name, mod_dnsvalue)['result']
user_options = {}
parts = [p for p in cmd.params() if 'dnsrecord_part' in p.flags]
if not parts:
return user_options
for part_id, part in enumerate(parts):
if mod_parts:
default = mod_parts[part_id]
else:
default = None
__get_part_param(cmd, part, user_options, default)
return user_options
def prompt_missing_parts(rrtype, cmd, kw, prompt_optional=False):
user_options = {}
parts = [p for p in cmd.params() if 'dnsrecord_part' in p.flags]
if not parts:
return user_options
for part in parts:
name = part.name
if name in kw:
continue
optional = not part.required
if optional and not prompt_optional:
continue
default = part.get_default(**kw)
__get_part_param(cmd, part, user_options, default)
return user_options
@register(override=True)
class dnsrecord_add(MethodOverride):
no_option_msg = 'No options to add a specific record provided.\n' \
"Command help may be consulted for all supported record types."
def interactive_prompt_callback(self, kw):
try:
has_cli_options(self, kw, self.no_option_msg)
# Some DNS records were entered, do not use full interactive help
# We should still ask user for required parts of DNS parts he is
# trying to add in the same way we do for standard LDAP parameters
#
# Do not ask for required parts when any "extra" option is used,
# it can be used to fill all required params by itself
new_kw = {}
for rrparam in iterate_rrparams_by_parts(self, kw,
skip_extra=True):
rrtype = get_record_rrtype(rrparam.name)
user_options = prompt_missing_parts(rrtype, self, kw,
prompt_optional=False)
new_kw.update(user_options)
kw.update(new_kw)
return
except errors.OptionError:
pass
try:
idnsname = DNSName(kw['idnsname'])
except Exception as e:
raise errors.ValidationError(name='idnsname', error=unicode(e))
try:
zonename = DNSName(kw['dnszoneidnsname'])
except Exception as e:
raise errors.ValidationError(name='dnszoneidnsname', error=unicode(e))
# check zone type
if idnsname.is_empty():
common_types = u', '.join(_zone_top_record_types)
elif zonename.is_reverse():
common_types = u', '.join(_rev_top_record_types)
else:
common_types = u', '.join(_top_record_types)
self.Backend.textui.print_plain(_(u'Please choose a type of DNS resource record to be added'))
self.Backend.textui.print_plain(_(u'The most common types for this type of zone are: %s\n') %\
common_types)
ok = False
while not ok:
rrtype = self.Backend.textui.prompt(_(u'DNS resource record type'))
if rrtype is None:
return
try:
name = record_name_format % rrtype.lower()
param = self.params[name]
if 'no_option' in param.flags:
raise ValueError()
except (KeyError, ValueError):
all_types = u', '.join(get_record_rrtype(p.name)
for p in self.params()
if (get_record_rrtype(p.name) and
'no_option' not in p.flags))
self.Backend.textui.print_plain(_(u'Invalid or unsupported type. Allowed values are: %s') % all_types)
continue
ok = True
user_options = prompt_parts(rrtype, self)
kw.update(user_options)
@register(override=True)
class dnsrecord_mod(MethodOverride):
no_option_msg = 'No options to modify a specific record provided.'
def interactive_prompt_callback(self, kw):
try:
has_cli_options(self, kw, self.no_option_msg, True)
except errors.OptionError:
pass
else:
# some record type entered, skip this helper
return
# get DNS record first so that the NotFound exception is raised
# before the helper would start
dns_record = self.api.Command['dnsrecord_show'](kw['dnszoneidnsname'], kw['idnsname'])['result']
self.Backend.textui.print_plain(_("No option to modify specific record provided."))
# ask user for records to be removed
self.Backend.textui.print_plain(_(u'Current DNS record contents:\n'))
record_params = []
for attr in dns_record:
try:
param = self.params[attr]
except KeyError:
continue
rrtype = get_record_rrtype(param.name)
if not rrtype:
continue
record_params.append((param, rrtype))
rec_type_content = u', '.join(dns_record[param.name])
self.Backend.textui.print_plain(u'%s: %s' % (param.label, rec_type_content))
self.Backend.textui.print_plain(u'')
# ask what records to remove
for param, rrtype in record_params:
rec_values = list(dns_record[param.name])
for rec_value in dns_record[param.name]:
rec_values.remove(rec_value)
mod_value = self.Backend.textui.prompt_yesno(
_("Modify %(name)s '%(value)s'?") % dict(name=param.label, value=rec_value), default=False)
if mod_value is True:
user_options = prompt_parts(rrtype, self,
mod_dnsvalue=rec_value)
kw[param.name] = [rec_value]
kw.update(user_options)
if rec_values:
self.Backend.textui.print_plain(ngettext(
u'%(count)d %(type)s record skipped. Only one value per DNS record type can be modified at one time.',
u'%(count)d %(type)s records skipped. Only one value per DNS record type can be modified at one time.',
0) % dict(count=len(rec_values), type=rrtype))
break
@register(override=True)
class dnsrecord_del(MethodOverride):
no_option_msg = _('Neither --del-all nor options to delete a specific record provided.\n'\
"Command help may be consulted for all supported record types.")
def interactive_prompt_callback(self, kw):
if kw.get('del_all', False):
return
try:
has_cli_options(self, kw, self.no_option_msg)
except errors.OptionError:
pass
else:
# some record type entered, skip this helper
return
# get DNS record first so that the NotFound exception is raised
# before the helper would start
dns_record = self.api.Command['dnsrecord_show'](kw['dnszoneidnsname'], kw['idnsname'])['result']
self.Backend.textui.print_plain(_("No option to delete specific record provided."))
user_del_all = self.Backend.textui.prompt_yesno(_("Delete all?"), default=False)
if user_del_all is True:
kw['del_all'] = True
return
# ask user for records to be removed
self.Backend.textui.print_plain(_(u'Current DNS record contents:\n'))
present_params = []
for attr in dns_record:
try:
param = self.params[attr]
except KeyError:
continue
if not get_record_rrtype(param.name):
continue
present_params.append(param)
rec_type_content = u', '.join(dns_record[param.name])
self.Backend.textui.print_plain(u'%s: %s' % (param.label, rec_type_content))
self.Backend.textui.print_plain(u'')
# ask what records to remove
for param in present_params:
deleted_values = []
for rec_value in dns_record[param.name]:
user_del_value = self.Backend.textui.prompt_yesno(
_("Delete %(name)s '%(value)s'?")
% dict(name=param.label, value=rec_value), default=False)
if user_del_value is True:
deleted_values.append(rec_value)
if deleted_values:
kw[param.name] = tuple(deleted_values)
@register(override=True)
class dnsconfig_mod(MethodOverride):
def interactive_prompt_callback(self, kw):
# show informative message on client side
# server cannot send messages asynchronous
if kw.get('idnsforwarders', False):
self.Backend.textui.print_plain(
_("Server will check DNS forwarder(s)."))
self.Backend.textui.print_plain(
_("This may take some time, please wait ..."))
@register(override=True)
class dnsforwardzone_add(MethodOverride):
def interactive_prompt_callback(self, kw):
# show informative message on client side
# server cannot send messages asynchronous
if kw.get('idnsforwarders', False):
self.Backend.textui.print_plain(
_("Server will check DNS forwarder(s)."))
self.Backend.textui.print_plain(
_("This may take some time, please wait ..."))
@register(override=True)
class dnsforwardzone_mod(MethodOverride):
def interactive_prompt_callback(self, kw):
# show informative message on client side
# server cannot send messages asynchronous
if kw.get('idnsforwarders', False):
self.Backend.textui.print_plain(
_("Server will check DNS forwarder(s)."))
self.Backend.textui.print_plain(
_("This may take some time, please wait ..."))

View File

@ -0,0 +1,45 @@
# Authors:
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib.plugable import Registry
register = Registry()
#@register()
class hbacrule_add_accesstime(MethodOverride):
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
textui.print_dashed(
'Added access time "%s" to HBAC rule "%s"' % (
options['accesstime'], cn
)
)
#@register()
class hbacrule_remove_accesstime(MethodOverride):
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
textui.print_dashed(
'Removed access time "%s" from HBAC rule "%s"' % (
options['accesstime'], cn
)
)

View File

@ -0,0 +1,55 @@
# Authors:
# Alexander Bokovoy <abokovoy@redhat.com>
#
# Copyright (C) 2011 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import CommandOverride
from ipalib.plugable import Registry
import six
if six.PY3:
unicode = str
register = Registry()
@register(override=True)
class hbactest(CommandOverride):
def output_for_cli(self, textui, output, *args, **options):
"""
Command.output_for_cli() uses --all option to decide whether to print detailed output.
We use --detail to allow that, thus we need to redefine output_for_cli().
"""
# Note that we don't actually use --detail below to see if details need
# to be printed as our execute() method will return None for corresponding
# entries and None entries will be skipped.
for o in self.output:
outp = self.output[o]
if 'no_display' in outp.flags:
continue
result = output[o]
if isinstance(result, (list, tuple)):
textui.print_attribute(unicode(outp.doc), result, '%s: %s', 1, True)
elif isinstance(result, (unicode, bool)):
if o == 'summary':
textui.print_summary(result)
else:
textui.print_indented(result)
# Propagate integer value for result. It will give proper command line result for scripts
return int(not output['value'])

49
ipaclient/plugins/host.py Normal file
View File

@ -0,0 +1,49 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib import errors, util
from ipalib.plugable import Registry
from ipalib import _
from ipalib import x509
register = Registry()
@register(override=True)
class host_show(MethodOverride):
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(host_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
x509.write_certificate_list(
result['result']['usercertificate'],
options['out']
)
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
)
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(host_show, self).forward(*keys, **options)

View File

@ -0,0 +1,89 @@
# Authors:
# Sumit Bose <sbose@redhat.com>
#
# Copyright (C) 2012 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib.plugable import Registry
from ipalib import api
register = Registry()
@register(override=True)
class idrange_add(MethodOverride):
def interactive_prompt_callback(self, kw):
"""
Ensure that rid-base is prompted for when dom-sid is specified.
Also ensure that secondary-rid-base is prompted for when rid-base is
specified and vice versa, in case that dom-sid was not specified.
Also ensure that rid-base and secondary-rid-base is prompted for
if ipa-adtrust-install has been run on the system.
"""
# dom-sid can be specified using dom-sid or dom-name options
# it can be also set using --setattr or --addattr, in these cases
# we will not prompt, but raise an ValidationError later
dom_sid_set = any(dom_id in kw for dom_id in
('ipanttrusteddomainname', 'ipanttrusteddomainsid'))
rid_base = kw.get('ipabaserid', None)
secondary_rid_base = kw.get('ipasecondarybaserid', None)
range_type = kw.get('iparangetype', None)
def set_from_prompt(param):
value = self.prompt_param(self.params[param])
update = {param: value}
kw.update(update)
if dom_sid_set:
# This is a trusted range
# Prompt for RID base if domain SID / name was given
if rid_base is None and range_type != u'ipa-ad-trust-posix':
set_from_prompt('ipabaserid')
else:
# This is a local range
# Find out whether ipa-adtrust-install has been ran
adtrust_is_enabled = api.Command['adtrust_is_enabled']()['result']
if adtrust_is_enabled:
# If ipa-adtrust-install has been ran, all local ranges
# require both RID base and secondary RID base
if rid_base is None:
set_from_prompt('ipabaserid')
if secondary_rid_base is None:
set_from_prompt('ipasecondarybaserid')
else:
# This is a local range on a server with no adtrust support
# Prompt for secondary RID base only if RID base was given
if rid_base is not None and secondary_rid_base is None:
set_from_prompt('ipasecondarybaserid')
# Symetrically, prompt for RID base if secondary RID base was
# given
if rid_base is None and secondary_rid_base is not None:
set_from_prompt('ipabaserid')

View File

@ -0,0 +1,42 @@
# Authors:
# Pavel Zuna <pzuna@redhat.com>
# Adam Young <ayoung@redhat.com>
# Endi S. Dewata <edewata@redhat.com>
#
# Copyright (c) 2010 Red Hat
# See file 'copying' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import json
from ipaclient.frontend import CommandOverride
from ipalib.util import json_serialize
from ipalib.plugable import Registry
register = Registry()
@register(override=True)
class json_metadata(CommandOverride):
def output_for_cli(self, textui, result, *args, **options):
print(json.dumps(result, default=json_serialize))
@register(override=True)
class i18n_messages(CommandOverride):
def output_for_cli(self, textui, result, *args, **options):
print(json.dumps(result, default=json_serialize))

View File

@ -0,0 +1,71 @@
# Authors:
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2009 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import six
from ipaclient.frontend import CommandOverride
from ipalib.plugable import Registry
from ipalib import _
if six.PY3:
unicode = str
register = Registry()
@register(override=True)
class migrate_ds(CommandOverride):
migrate_order = ('user', 'group')
migration_disabled_msg = _('''\
Migration mode is disabled. Use \'ipa config-mod\' to enable it.''')
pwd_migration_msg = _('''\
Passwords have been migrated in pre-hashed format.
IPA is unable to generate Kerberos keys unless provided
with clear text passwords. All migrated users need to
login at https://your.domain/ipa/migration/ before they
can use their Kerberos accounts.''')
def output_for_cli(self, textui, result, ldapuri, bindpw, **options):
textui.print_name(self.name)
if not result['enabled']:
textui.print_plain(self.migration_disabled_msg)
return 1
if not result['compat']:
textui.print_plain("The compat plug-in is enabled. This can increase the memory requirements during migration. Disable the compat plug-in with \'ipa-compat-manage disable\' or re-run this script with \'--with-compat\' option.")
return 1
any_migrated = any(result['result'].values())
textui.print_plain('Migrated:')
textui.print_entry1(
result['result'], attr_order=self.migrate_order,
one_value_per_line=False
)
for ldap_obj_name in self.migrate_order:
textui.print_plain('Failed %s:' % ldap_obj_name)
textui.print_entry1(
result['failed'][ldap_obj_name], attr_order=self.migrate_order,
one_value_per_line=True,
)
textui.print_plain('-' * len(self.name))
if not any_migrated:
textui.print_plain('No users/groups were migrated from %s' %
ldapuri)
return 1
textui.print_plain(unicode(self.pwd_migration_msg))

View File

@ -17,14 +17,24 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import sys
from ipaclient.frontend import MethodOverride
from ipalib import api, Str, Password, _
from ipalib.messages import add_message, ResultFormattingError
from ipalib.plugable import Registry
from ipalib.frontend import Local
from ipaplatform.paths import paths
from ipapython.dn import DN
from ipapython.nsslib import NSSConnection
from ipapython.version import API_VERSION
import locale
import qrcode
import six
from six import StringIO
from six.moves import urllib
if six.PY3:
@ -33,6 +43,78 @@ if six.PY3:
register = Registry()
@register(override=True)
class otptoken_add(MethodOverride):
def _get_qrcode(self, output, uri, version):
# Print QR code to terminal if specified
qr_output = StringIO()
qr = qrcode.QRCode()
qr.add_data(uri)
qr.make()
qr.print_ascii(out=qr_output, tty=False)
encoding = getattr(sys.stdout, 'encoding', None)
if encoding is None:
encoding = locale.getpreferredencoding(False)
try:
qr_code = qr_output.getvalue().decode(encoding)
except UnicodeError:
add_message(
version,
output,
message=ResultFormattingError(
message=_("Unable to display QR code using the configured "
"output encoding. Please use the token URI to "
"configure you OTP device")
)
)
return None
if sys.stdout.isatty():
output_width = self.api.Backend.textui.get_tty_width()
qr_code_width = len(qr_code.splitlines()[0])
if qr_code_width > output_width:
add_message(
version,
output,
message=ResultFormattingError(
message=_(
"QR code width is greater than that of the output "
"tty. Please resize your terminal.")
)
)
return qr
def output_for_cli(self, textui, output, *args, **options):
# copy-pasted from ipalib/Frontend.__do_call()
# because option handling is broken on client-side
if 'version' in options:
pass
elif self.api.env.skip_version_check:
options['version'] = u'2.0'
else:
options['version'] = API_VERSION
uri = output['result'].get('uri', None)
if uri is not None and not options.get('no_qrcode', False):
qr = self._get_qrcode(output, uri, options['version'])
else:
qr = None
rv = super(otptoken_add, self).output_for_cli(
textui, output, *args, **options)
if qr is not None:
print("\n")
qr.print_ascii(tty=sys.stdout.isatty())
print("\n")
return rv
class HTTPSHandler(urllib.request.HTTPSHandler):
"Opens SSL HTTPS connections that perform hostname validation."

View File

@ -0,0 +1,51 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
# Rob Crittenden <rcritten@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib import errors
from ipalib.plugable import Registry
from ipalib import x509
from ipalib import _
from ipalib import util
register = Registry()
@register(override=True)
class service_show(MethodOverride):
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
x509.write_certificate_list(
result['result']['usercertificate'],
options['out']
)
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
)
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(service_show, self).forward(*keys, **options)

View File

@ -0,0 +1,57 @@
# Authors:
# Jr Aquino <jr.aquino@citrixonline.com>
#
# Copyright (C) 2010-2014 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib.plugable import Registry
from ipalib import _
register = Registry()
@register(override=True)
class sudorule_enable(MethodOverride):
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(_('Enabled Sudo Rule "%s"') % cn)
@register(override=True)
class sudorule_disable(MethodOverride):
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(_('Disabled Sudo Rule "%s"') % cn)
@register(override=True)
class sudorule_add_option(MethodOverride):
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(
_('Added option "%(option)s" to Sudo Rule "%(rule)s"')
% dict(option=options['ipasudoopt'], rule=cn))
super(sudorule_add_option, self).output_for_cli(textui, result, cn,
**options)
@register(override=True)
class sudorule_remove_option(MethodOverride):
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(
_('Removed option "%(option)s" from Sudo Rule "%(rule)s"')
% dict(option=options['ipasudoopt'], rule=cn))
super(sudorule_remove_option, self).output_for_cli(textui, result, cn,
**options)

View File

@ -0,0 +1,54 @@
#
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import six
from ipaclient.frontend import MethodOverride
from ipalib.plugable import Registry
from ipalib import _
if six.PY3:
unicode = str
register = Registry()
@register(override=True)
class topologysuffix_verify(MethodOverride):
def output_for_cli(self, textui, output, *args, **options):
in_order = output['result']['in_order']
connect_errors = output['result']['connect_errors']
max_agmts_errors = output['result']['max_agmts_errors']
if in_order:
header = _('Replication topology of suffix "%(suffix)s" '
'is in order.')
else:
header = _('Replication topology of suffix "%(suffix)s" contains '
'errors.')
textui.print_h1(header % {'suffix': args[0]})
if connect_errors:
textui.print_dashed(unicode(_('Topology is disconnected')))
for err in connect_errors:
msg = _("Server %(srv)s can't contact servers: %(replicas)s")
msg = msg % {'srv': err[0], 'replicas': ', '.join(err[2])}
textui.print_indented(msg)
if max_agmts_errors:
textui.print_dashed(unicode(_('Recommended maximum number of '
'agreements per replica exceeded')))
textui.print_attribute(
unicode(_("Maximum number of agreements per replica")),
[output['result']['max_agmts']]
)
for err in max_agmts_errors:
msg = _('Server "%(srv)s" has %(n)d agreements with servers:')
msg = msg % {'srv': err[0], 'n': len(err[1])}
textui.print_indented(msg)
for replica in err[1]:
textui.print_indented(replica, 2)
return 0

View File

@ -0,0 +1,51 @@
# Authors:
# Alexander Bokovoy <abokovoy@redhat.com>
# Martin Kosek <mkosek@redhat.com>
#
# Copyright (C) 2011 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib.plugable import Registry
register = Registry()
@register(override=True)
class trust_add(MethodOverride):
def interactive_prompt_callback(self, kw):
"""
Also ensure that realm_admin is prompted for if --admin or
--trust-secret is not specified when 'ipa trust-add' is run on the
system.
Also ensure that realm_passwd is prompted for if --password or
--trust-secret is not specified when 'ipa trust-add' is run on the
system.
"""
trust_secret = kw.get('trust_secret')
realm_admin = kw.get('realm_admin')
realm_passwd = kw.get('realm_passwd')
if trust_secret is None:
if realm_admin is None:
kw['realm_admin'] = self.prompt_param(
self.params['realm_admin'])
if realm_passwd is None:
kw['realm_passwd'] = self.Backend.textui.prompt_password(
self.params['realm_passwd'].label, confirm=False)

82
ipaclient/plugins/user.py Normal file
View File

@ -0,0 +1,82 @@
# Authors:
# Jason Gerard DeRose <jderose@redhat.com>
# Pavel Zuna <pzuna@redhat.com>
#
# Copyright (C) 2008 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from ipaclient.frontend import MethodOverride
from ipalib import errors
from ipalib import Flag
from ipalib import util
from ipalib.plugable import Registry
from ipalib import _
from ipalib import x509
register = Registry()
@register(override=True)
class user_del(MethodOverride):
def get_options(self):
for option in super(user_del, self).get_options():
yield option
yield Flag(
'preserve?',
include='cli',
doc=_('Delete a user, keeping the entry available for future use'),
)
yield Flag(
'no_preserve?',
include='cli',
doc=_('Delete a user'),
)
def forward(self, *keys, **options):
if self.api.env.context == 'cli':
no_preserve = options.pop('no_preserve', False)
preserve = options.pop('preserve', False)
if no_preserve and preserve:
raise errors.MutuallyExclusiveError(
reason=_("preserve and no-preserve cannot be both set"))
elif no_preserve:
options['preserve'] = False
elif preserve:
options['preserve'] = True
return super(user_del, self).forward(*keys, **options)
@register(override=True)
class user_show(MethodOverride):
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(user_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
x509.write_certificate_list(
result['result']['usercertificate'],
options['out']
)
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
)
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(user_show, self).forward(*keys, **options)

View File

@ -36,6 +36,7 @@ from cryptography.hazmat.primitives.serialization import load_pem_public_key,\
import nss.nss as nss
from ipaclient.frontend import MethodOverride
from ipalib.frontend import Local
from ipalib import errors
from ipalib import Bytes, Flag, Str
@ -492,6 +493,25 @@ class vault_mod(Local):
return response
@register(override=True)
class vaultconfig_show(MethodOverride):
def forward(self, *args, **options):
file = options.get('transport_out')
# don't send these parameters to server
if 'transport_out' in options:
del options['transport_out']
response = super(vaultconfig_show, self).forward(*args, **options)
if file:
with open(file, 'w') as f:
f.write(response['result']['transport_cert'])
return response
@register()
class vault_archive(Local):
__doc__ = _('Archive data into a vault.')

View File

@ -338,54 +338,6 @@ class automountlocation_tofiles(LDAPQuery):
return dict(result=dict(maps=maps, keys=keys,
orphanmaps=orphanmaps, orphankeys=orphankeys))
def output_for_cli(self, textui, result, *keys, **options):
maps = result['result']['maps']
keys = result['result']['keys']
orphanmaps = result['result']['orphanmaps']
orphankeys = result['result']['orphankeys']
textui.print_plain('/etc/auto.master:')
for m in maps:
if m['automountinformation'][0].startswith('-'):
textui.print_plain(
'%s\t%s' % (
m['automountkey'][0], m['automountinformation'][0]
)
)
else:
textui.print_plain(
'%s\t/etc/%s' % (
m['automountkey'][0], m['automountinformation'][0]
)
)
for m in maps:
if m['automountinformation'][0].startswith('-'):
continue
info = m['automountinformation'][0]
textui.print_plain('---------------------------')
textui.print_plain('/etc/%s:' % info)
for k in keys[info]:
textui.print_plain(
'%s\t%s' % (
k['automountkey'][0], k['automountinformation'][0]
)
)
textui.print_plain('')
textui.print_plain(_('maps not connected to /etc/auto.master:'))
for m in orphanmaps:
textui.print_plain('---------------------------')
textui.print_plain('/etc/%s:' % m['automountmapname'])
for k in orphankeys:
if len(k) == 0: continue
dn = DN(k[0]['dn'])
if dn['automountmapname'] == m['automountmapname'][0]:
textui.print_plain(
'%s\t%s' % (
k[0]['automountkey'][0], k[0]['automountinformation'][0]
)
)
@register()
class automountmap(LDAPObject):

View File

@ -28,7 +28,6 @@ from ipalib import api
from ipalib import errors
from ipalib import pkcs10
from ipalib import x509
from ipalib import util
from ipalib import ngettext
from ipalib.plugable import Registry
from .virtual import VirtualCommand
@ -633,18 +632,6 @@ class cert_show(VirtualCommand):
return dict(result=result)
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(cert_show, self).forward(*keys, **options)
if 'certificate' in result['result']:
x509.write_certificate(result['result']['certificate'], options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(cert_show, self).forward(*keys, **options)

View File

@ -5,7 +5,6 @@
import re
from ipalib import api, Bool, File, Str
from ipalib import util
from ipalib.plugable import Registry
from .baseldap import (
LDAPObject, LDAPSearch, LDAPCreate,
@ -218,21 +217,6 @@ class certprofile_show(LDAPRetrieve):
return result
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(certprofile_show, self).forward(*keys, **options)
if 'out' in options and 'config' in result['result']:
with open(options['out'], 'wb') as f:
f.write(result['result'].pop('config'))
result['summary'] = (
_("Profile configuration stored in file '%(file)s'")
% dict(file=options['out'])
)
return result
@register()
class certprofile_import(LDAPCreate):

View File

@ -19,7 +19,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
from __future__ import print_function
import netaddr
import time
@ -56,7 +55,7 @@ from .baseldap import (
LDAPQuery,
LDAPDelete,
LDAPRetrieve)
from ipalib import _, ngettext
from ipalib import _
from ipalib import messages
from ipalib.util import (normalize_zonemgr,
get_dns_forward_zone_update_policy,
@ -313,11 +312,6 @@ _record_types = (
# DNS zone record identificator
_dns_zone_record = DNSName.empty
# most used record types, always ask for those in interactive prompt
_top_record_types = ('A', 'AAAA', )
_rev_top_record_types = ('PTR', )
_zone_top_record_types = ('NS', 'MX', 'LOC', )
# attributes derived from record types
_record_attributes = [str(record_name_format % t.lower())
for t in _record_types]
@ -673,61 +667,6 @@ def _check_DN_objectclass(ldap, dn, objectclasses):
return _check_entry_objectclass(entry, objectclasses)
def __get_part_param(cmd, part, output_kw, default=None):
name = part.name
label = unicode(part.label)
optional = not part.required
output_kw[name] = cmd.prompt_param(part,
optional=optional,
label=label)
def prompt_parts(rrtype, cmd, mod_dnsvalue=None):
mod_parts = None
if mod_dnsvalue is not None:
name = record_name_format % rrtype.lower()
mod_parts = cmd.api.Command.dnsrecord_split_parts(
name, mod_dnsvalue)['result']
user_options = {}
parts = [p for p in cmd.params() if 'dnsrecord_part' in p.flags]
if not parts:
return user_options
for part_id, part in enumerate(parts):
if mod_parts:
default = mod_parts[part_id]
else:
default = None
__get_part_param(cmd, part, user_options, default)
return user_options
def prompt_missing_parts(rrtype, cmd, kw, prompt_optional=False):
user_options = {}
parts = [p for p in cmd.params() if 'dnsrecord_part' in p.flags]
if not parts:
return user_options
for part in parts:
name = part.name
if name in kw:
continue
optional = not part.required
if optional and not prompt_optional:
continue
default = part.get_default(**kw)
__get_part_param(cmd, part, user_options, default)
return user_options
class DNSRecord(Str):
# a list of parts that create the actual raw DNS record
parts = None
@ -3575,75 +3514,6 @@ class dnsrecord_add(LDAPCreate):
has_cli_options(self, options, self.no_option_msg)
return super(dnsrecord_add, self).args_options_2_entry(*keys, **options)
def interactive_prompt_callback(self, kw):
try:
has_cli_options(self, kw, self.no_option_msg)
# Some DNS records were entered, do not use full interactive help
# We should still ask user for required parts of DNS parts he is
# trying to add in the same way we do for standard LDAP parameters
#
# Do not ask for required parts when any "extra" option is used,
# it can be used to fill all required params by itself
new_kw = {}
for rrparam in iterate_rrparams_by_parts(self, kw,
skip_extra=True):
rrtype = get_record_rrtype(rrparam.name)
user_options = prompt_missing_parts(rrtype, self, kw,
prompt_optional=False)
new_kw.update(user_options)
kw.update(new_kw)
return
except errors.OptionError:
pass
try:
idnsname = DNSName(kw['idnsname'])
except Exception as e:
raise errors.ValidationError(name='idnsname', error=unicode(e))
try:
zonename = DNSName(kw['dnszoneidnsname'])
except Exception as e:
raise errors.ValidationError(name='dnszoneidnsname', error=unicode(e))
# check zone type
if idnsname.is_empty():
common_types = u', '.join(_zone_top_record_types)
elif zonename.is_reverse():
common_types = u', '.join(_rev_top_record_types)
else:
common_types = u', '.join(_top_record_types)
self.Backend.textui.print_plain(_(u'Please choose a type of DNS resource record to be added'))
self.Backend.textui.print_plain(_(u'The most common types for this type of zone are: %s\n') %\
common_types)
ok = False
while not ok:
rrtype = self.Backend.textui.prompt(_(u'DNS resource record type'))
if rrtype is None:
return
try:
name = record_name_format % rrtype.lower()
param = self.params[name]
if 'no_option' in param.flags:
raise ValueError()
except (KeyError, ValueError):
all_types = u', '.join(get_record_rrtype(p.name)
for p in self.params()
if (get_record_rrtype(p.name) and
'no_option' not in p.flags))
self.Backend.textui.print_plain(_(u'Invalid or unsupported type. Allowed values are: %s') % all_types)
continue
ok = True
user_options = prompt_parts(rrtype, self)
kw.update(user_options)
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)
precallback_attrs = []
@ -3898,60 +3768,6 @@ class dnsrecord_mod(LDAPUpdate):
self.obj.postprocess_record(entry_attrs, **options)
return dn
def interactive_prompt_callback(self, kw):
try:
has_cli_options(self, kw, self.no_option_msg, True)
except errors.OptionError:
pass
else:
# some record type entered, skip this helper
return
# get DNS record first so that the NotFound exception is raised
# before the helper would start
dns_record = self.api.Command['dnsrecord_show'](kw['dnszoneidnsname'], kw['idnsname'])['result']
self.Backend.textui.print_plain(_("No option to modify specific record provided."))
# ask user for records to be removed
self.Backend.textui.print_plain(_(u'Current DNS record contents:\n'))
record_params = []
for attr in dns_record:
try:
param = self.params[attr]
except KeyError:
continue
rrtype = get_record_rrtype(param.name)
if not rrtype:
continue
record_params.append((param, rrtype))
rec_type_content = u', '.join(dns_record[param.name])
self.Backend.textui.print_plain(u'%s: %s' % (param.label, rec_type_content))
self.Backend.textui.print_plain(u'')
# ask what records to remove
for param, rrtype in record_params:
rec_values = list(dns_record[param.name])
for rec_value in dns_record[param.name]:
rec_values.remove(rec_value)
mod_value = self.Backend.textui.prompt_yesno(
_("Modify %(name)s '%(value)s'?") % dict(name=param.label, value=rec_value), default=False)
if mod_value is True:
user_options = prompt_parts(rrtype, self,
mod_dnsvalue=rec_value)
kw[param.name] = [rec_value]
kw.update(user_options)
if rec_values:
self.Backend.textui.print_plain(ngettext(
u'%(count)d %(type)s record skipped. Only one value per DNS record type can be modified at one time.',
u'%(count)d %(type)s records skipped. Only one value per DNS record type can be modified at one time.',
0) % dict(count=len(rec_values), type=rrtype))
break
@register()
class dnsrecord_delentry(LDAPDelete):
@ -4085,58 +3901,6 @@ class dnsrecord_del(LDAPUpdate):
has_cli_options(self, options, self.no_option_msg)
return super(dnsrecord_del, self).args_options_2_entry(*keys, **options)
def interactive_prompt_callback(self, kw):
if kw.get('del_all', False):
return
try:
has_cli_options(self, kw, self.no_option_msg)
except errors.OptionError:
pass
else:
# some record type entered, skip this helper
return
# get DNS record first so that the NotFound exception is raised
# before the helper would start
dns_record = self.api.Command['dnsrecord_show'](kw['dnszoneidnsname'], kw['idnsname'])['result']
self.Backend.textui.print_plain(_("No option to delete specific record provided."))
user_del_all = self.Backend.textui.prompt_yesno(_("Delete all?"), default=False)
if user_del_all is True:
kw['del_all'] = True
return
# ask user for records to be removed
self.Backend.textui.print_plain(_(u'Current DNS record contents:\n'))
present_params = []
for attr in dns_record:
try:
param = self.params[attr]
except KeyError:
continue
if not get_record_rrtype(param.name):
continue
present_params.append(param)
rec_type_content = u', '.join(dns_record[param.name])
self.Backend.textui.print_plain(u'%s: %s' % (param.label, rec_type_content))
self.Backend.textui.print_plain(u'')
# ask what records to remove
for param in present_params:
deleted_values = []
for rec_value in dns_record[param.name]:
user_del_value = self.Backend.textui.prompt_yesno(
_("Delete %(name)s '%(value)s'?")
% dict(name=param.label, value=rec_value), default=False)
if user_del_value is True:
deleted_values.append(rec_value)
if deleted_values:
kw[param.name] = tuple(deleted_values)
@register()
class dnsrecord_show(LDAPRetrieve):
@ -4354,16 +4118,6 @@ class dnsconfig_mod(LDAPUpdate):
option = option.clone(include=('installer', 'updates'))
yield option
def interactive_prompt_callback(self, kw):
# show informative message on client side
# server cannot send messages asynchronous
if kw.get('idnsforwarders', False):
self.Backend.textui.print_plain(
_("Server will check DNS forwarder(s)."))
self.Backend.textui.print_plain(
_("This may take some time, please wait ..."))
def execute(self, *keys, **options):
# test dnssec forwarders
forwarders = options.get('idnsforwarders')
@ -4524,15 +4278,6 @@ class dnsforwardzone(DNSZoneBase):
class dnsforwardzone_add(DNSZoneBase_add):
__doc__ = _('Create new DNS forward zone.')
def interactive_prompt_callback(self, kw):
# show informative message on client side
# server cannot send messages asynchronous
if kw.get('idnsforwarders', False):
self.Backend.textui.print_plain(
_("Server will check DNS forwarder(s)."))
self.Backend.textui.print_plain(
_("This may take some time, please wait ..."))
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)
@ -4571,15 +4316,6 @@ class dnsforwardzone_del(DNSZoneBase_del):
class dnsforwardzone_mod(DNSZoneBase_mod):
__doc__ = _('Modify DNS forward zone.')
def interactive_prompt_callback(self, kw):
# show informative message on client side
# server cannot send messages asynchronous
if kw.get('idnsforwarders', False):
self.Backend.textui.print_plain(
_("Server will check DNS forwarder(s)."))
self.Backend.textui.print_plain(
_("This may take some time, please wait ..."))
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
try:
entry = ldap.get_entry(dn)

View File

@ -441,14 +441,6 @@ class hbacrule_add_accesstime(LDAPQuery):
return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
textui.print_dashed(
'Added access time "%s" to HBAC rule "%s"' % (
options['accesstime'], cn
)
)
# @register()
class hbacrule_remove_accesstime(LDAPQuery):
@ -480,14 +472,6 @@ class hbacrule_remove_accesstime(LDAPQuery):
return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_name(self.name)
textui.print_dashed(
'Removed access time "%s" from HBAC rule "%s"' % (
options['accesstime'], cn
)
)
@register()
class hbacrule_add_user(LDAPAddMember):

View File

@ -492,28 +492,3 @@ class hbactest(Command):
result['value'] = access_granted
return result
def output_for_cli(self, textui, output, *args, **options):
"""
Command.output_for_cli() uses --all option to decide whether to print detailed output.
We use --detail to allow that, thus we need to redefine output_for_cli().
"""
# Note that we don't actually use --detail below to see if details need
# to be printed as our execute() method will return None for corresponding
# entries and None entries will be skipped.
for o in self.output:
outp = self.output[o]
if 'no_display' in outp.flags:
continue
result = output[o]
if isinstance(result, (list, tuple)):
textui.print_attribute(unicode(outp.doc), result, '%s: %s', 1, True)
elif isinstance(result, (unicode, bool)):
if o == 'summary':
textui.print_summary(result)
else:
textui.print_indented(result)
# Propagate integer value for result. It will give proper command line result for scripts
return int(not output['value'])

View File

@ -1085,25 +1085,6 @@ class host_show(LDAPRetrieve):
return dn
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(host_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
x509.write_certificate_list(
result['result']['usercertificate'],
options['out']
)
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
)
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(host_show, self).forward(*keys, **options)
@register()
class host_disable(LDAPQuery):

View File

@ -401,68 +401,6 @@ class idrange_add(LDAPCreate):
msg_summary = _('Added ID range "%(value)s"')
def interactive_prompt_callback(self, kw):
"""
Ensure that rid-base is prompted for when dom-sid is specified.
Also ensure that secondary-rid-base is prompted for when rid-base is
specified and vice versa, in case that dom-sid was not specified.
Also ensure that rid-base and secondary-rid-base is prompted for
if ipa-adtrust-install has been run on the system.
"""
# dom-sid can be specified using dom-sid or dom-name options
# it can be also set using --setattr or --addattr, in these cases
# we will not prompt, but raise an ValidationError later
dom_sid_set = any(dom_id in kw for dom_id in
('ipanttrusteddomainname', 'ipanttrusteddomainsid'))
rid_base = kw.get('ipabaserid', None)
secondary_rid_base = kw.get('ipasecondarybaserid', None)
range_type = kw.get('iparangetype', None)
def set_from_prompt(param):
value = self.prompt_param(self.params[param])
update = {param: value}
kw.update(update)
if dom_sid_set:
# This is a trusted range
# Prompt for RID base if domain SID / name was given
if rid_base is None and range_type != u'ipa-ad-trust-posix':
set_from_prompt('ipabaserid')
else:
# This is a local range
# Find out whether ipa-adtrust-install has been ran
adtrust_is_enabled = api.Command['adtrust_is_enabled']()['result']
if adtrust_is_enabled:
# If ipa-adtrust-install has been ran, all local ranges
# require both RID base and secondary RID base
if rid_base is None:
set_from_prompt('ipabaserid')
if secondary_rid_base is None:
set_from_prompt('ipasecondarybaserid')
else:
# This is a local range on a server with no adtrust support
# Prompt for secondary RID base only if RID base was given
if rid_base is not None and secondary_rid_base is None:
set_from_prompt('ipasecondarybaserid')
# Symetrically, prompt for RID base if secondary RID base was
# given
if rid_base is None and secondary_rid_base is not None:
set_from_prompt('ipabaserid')
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)

View File

@ -22,10 +22,6 @@
"""
Plugins not accessible directly through the CLI, commands used internally
"""
from __future__ import print_function
import json
from ipalib import Command
from ipalib import Str
from ipalib.output import Output
@ -137,9 +133,6 @@ class json_metadata(Command):
return retval
def output_for_cli(self, textui, result, *args, **options):
print(json.dumps(result, default=json_serialize))
@register()
class i18n_messages(Command):
@ -864,6 +857,3 @@ class i18n_messages(Command):
)
def execute(self, **options):
return dict(texts=json_serialize(self.messages))
def output_for_cli(self, textui, result, *args, **options):
print(json.dumps(result, default=json_serialize))

View File

@ -652,16 +652,6 @@ search results for objects to be migrated
have been truncated by the server;
migration process might be incomplete\n''')
migration_disabled_msg = _('''\
Migration mode is disabled. Use \'ipa config-mod\' to enable it.''')
pwd_migration_msg = _('''\
Passwords have been migrated in pre-hashed format.
IPA is unable to generate Kerberos keys unless provided
with clear text passwords. All migrated users need to
login at https://your.domain/ipa/migration/ before they
can use their Kerberos accounts.''')
def get_options(self):
"""
Call get_options of the baseclass and add "exclude" options
@ -927,31 +917,3 @@ can use their Kerberos accounts.''')
)
return dict(result=migrated, failed=failed, enabled=True, compat=True)
def output_for_cli(self, textui, result, ldapuri, bindpw, **options):
textui.print_name(self.name)
if not result['enabled']:
textui.print_plain(self.migration_disabled_msg)
return 1
if not result['compat']:
textui.print_plain("The compat plug-in is enabled. This can increase the memory requirements during migration. Disable the compat plug-in with \'ipa-compat-manage disable\' or re-run this script with \'--with-compat\' option.")
return 1
any_migrated = any(result['result'].values())
textui.print_plain('Migrated:')
textui.print_entry1(
result['result'], attr_order=self.migrate_order,
one_value_per_line=False
)
for ldap_obj_name in self.migrate_order:
textui.print_plain('Failed %s:' % ldap_obj_name)
textui.print_entry1(
result['failed'][ldap_obj_name], attr_order=self.migrate_order,
one_value_per_line=True,
)
textui.print_plain('-' * len(self.name))
if not any_migrated:
textui.print_plain('No users/groups were migrated from %s' %
ldapuri)
return 1
textui.print_plain(unicode(self.pwd_migration_msg))

View File

@ -17,13 +17,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import sys
from .baseldap import LDAPObject, LDAPAddMember, LDAPRemoveMember
from .baseldap import LDAPCreate, LDAPDelete, LDAPUpdate, LDAPSearch, LDAPRetrieve
from ipalib import api, Int, Str, Bool, DateTime, Flag, Bytes, IntEnum, StrEnum, _, ngettext
from ipalib.messages import add_message, ResultFormattingError
from ipalib.plugable import Registry
from ipalib.errors import (
PasswordMismatch,
@ -32,16 +28,12 @@ from ipalib.errors import (
ValidationError)
from ipalib.request import context
from ipapython.dn import DN
from ipapython.version import API_VERSION
import base64
import locale
import uuid
import qrcode
import os
import six
from six import StringIO
from six.moves import urllib
if six.PY3:
@ -361,75 +353,6 @@ class otptoken_add(LDAPCreate):
_convert_owner(self.api.Object.user, entry_attrs, options)
return super(otptoken_add, self).post_callback(ldap, dn, entry_attrs, *keys, **options)
def _get_qrcode(self, output, uri, version):
# Print QR code to terminal if specified
qr_output = StringIO()
qr = qrcode.QRCode()
qr.add_data(uri)
qr.make()
qr.print_ascii(out=qr_output, tty=False)
encoding = getattr(sys.stdout, 'encoding', None)
if encoding is None:
encoding = locale.getpreferredencoding(False)
try:
qr_code = qr_output.getvalue().decode(encoding)
except UnicodeError:
add_message(
version,
output,
message=ResultFormattingError(
message=_("Unable to display QR code using the configured "
"output encoding. Please use the token URI to "
"configure you OTP device")
)
)
return None
if sys.stdout.isatty():
output_width = self.api.Backend.textui.get_tty_width()
qr_code_width = len(qr_code.splitlines()[0])
if qr_code_width > output_width:
add_message(
version,
output,
message=ResultFormattingError(
message=_(
"QR code width is greater than that of the output "
"tty. Please resize your terminal.")
)
)
return qr
def output_for_cli(self, textui, output, *args, **options):
# copy-pasted from ipalib/Frontend.__do_call()
# because option handling is broken on client-side
if 'version' in options:
pass
elif self.api.env.skip_version_check:
options['version'] = u'2.0'
else:
options['version'] = API_VERSION
uri = output['result'].get('uri', None)
if uri is not None and not options.get('no_qrcode', False):
qr = self._get_qrcode(output, uri, options['version'])
else:
qr = None
rv = super(otptoken_add, self).output_for_cli(
textui, output, *args, **options)
if qr is not None:
print("\n")
qr.print_ascii(tty=sys.stdout.isatty())
print("\n")
return rv
@register()
class otptoken_del(LDAPDelete):

View File

@ -731,25 +731,6 @@ class service_show(LDAPRetrieve):
return dn
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
x509.write_certificate_list(
result['result']['usercertificate'],
options['out']
)
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
)
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(service_show, self).forward(*keys, **options)
@register()
class service_add_host(LDAPAddMember):

View File

@ -493,9 +493,6 @@ class sudorule_enable(LDAPQuery):
return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(_('Enabled Sudo Rule "%s"') % cn)
@register()
class sudorule_disable(LDAPQuery):
@ -519,9 +516,6 @@ class sudorule_disable(LDAPQuery):
return dict(result=True)
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(_('Disabled Sudo Rule "%s"') % cn)
@register()
class sudorule_add_allow_command(LDAPAddMember):
@ -953,14 +947,6 @@ class sudorule_add_option(LDAPQuery):
return dict(result=entry_attrs, value=pkey_to_value(cn, options))
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(
_('Added option "%(option)s" to Sudo Rule "%(rule)s"')
% dict(option=options['ipasudoopt'], rule=cn))
super(sudorule_add_option, self).output_for_cli(textui, result, cn,
**options)
@register()
class sudorule_remove_option(LDAPQuery):
@ -1010,11 +996,3 @@ class sudorule_remove_option(LDAPQuery):
entry_attrs = entry_to_dict(entry_attrs, **options)
return dict(result=entry_attrs, value=pkey_to_value(cn, options))
def output_for_cli(self, textui, result, cn, **options):
textui.print_dashed(
_('Removed option "%(option)s" from Sudo Rule "%(rule)s"')
% dict(option=options['ipasudoopt'], rule=cn))
super(sudorule_remove_option, self).output_for_cli(textui, result, cn,
**options)

View File

@ -501,40 +501,3 @@ Checks done:
'max_agmts': self.api.env.recommended_max_agmts
},
)
def output_for_cli(self, textui, output, *args, **options):
in_order = output['result']['in_order']
connect_errors = output['result']['connect_errors']
max_agmts_errors = output['result']['max_agmts_errors']
if in_order:
header = _('Replication topology of suffix "%(suffix)s" '
'is in order.')
else:
header = _('Replication topology of suffix "%(suffix)s" contains '
'errors.')
textui.print_h1(header % {'suffix': args[0]})
if connect_errors:
textui.print_dashed(unicode(_('Topology is disconnected')))
for err in connect_errors:
msg = _("Server %(srv)s can't contact servers: %(replicas)s")
msg = msg % {'srv': err[0], 'replicas': ', '.join(err[2])}
textui.print_indented(msg)
if max_agmts_errors:
textui.print_dashed(unicode(_('Recommended maximum number of '
'agreements per replica exceeded')))
textui.print_attribute(
unicode(_("Maximum number of agreements per replica")),
[output['result']['max_agmts']]
)
for err in max_agmts_errors:
msg = _('Server "%(srv)s" has %(n)d agreements with servers:')
msg = msg % {'srv': err[0], 'n': len(err[1])}
textui.print_indented(msg)
for replica in err[1]:
textui.print_indented(replica, 2)
return 0

View File

@ -748,30 +748,6 @@ sides.
return result
def interactive_prompt_callback(self, kw):
"""
Also ensure that realm_admin is prompted for if --admin or
--trust-secret is not specified when 'ipa trust-add' is run on the
system.
Also ensure that realm_passwd is prompted for if --password or
--trust-secret is not specified when 'ipa trust-add' is run on the
system.
"""
trust_secret = kw.get('trust_secret')
realm_admin = kw.get('realm_admin')
realm_passwd = kw.get('realm_passwd')
if trust_secret is None:
if realm_admin is None:
kw['realm_admin'] = self.prompt_param(
self.params['realm_admin'])
if realm_passwd is None:
kw['realm_passwd'] = self.Backend.textui.prompt_password(
self.params['realm_passwd'].label, confirm=False)
def validate_options(self, *keys, **options):
trusted_realm_domain = keys[-1]

View File

@ -27,7 +27,6 @@ import six
from ipalib import api
from ipalib import errors
from ipalib import util
from ipalib import Bool, Flag, Str
from .baseuser import (
baseuser,
@ -60,7 +59,6 @@ from . import baseldap
from ipalib.request import context
from ipalib import _, ngettext
from ipalib import output
from ipalib import x509
from ipaplatform.paths import paths
from ipapython.dn import DN
from ipapython.ipautil import ipa_generate_password
@ -605,14 +603,6 @@ class user_del(baseuser_del):
Bool('preserve?',
exclude='cli',
),
Flag('preserve?',
include='cli',
doc=_('Delete a user, keeping the entry available for future use'),
),
Flag('no_preserve?',
include='cli',
doc=_('Delete a user'),
),
)
def _preserve_user(self, pkey, delete_container, **options):
@ -673,20 +663,6 @@ class user_del(baseuser_del):
if restoreAttr:
self._exc_wrapper(pkey, options, ldap.update_entry)(entry_attrs)
def forward(self, *keys, **options):
if self.api.env.context == 'cli':
no_preserve = options.pop('no_preserve', False)
preserve = options.pop('preserve', False)
if no_preserve and preserve:
raise errors.MutuallyExclusiveError(
reason=_("preserve and no-preserve cannot be both set"))
elif no_preserve:
options['preserve'] = False
elif preserve:
options['preserve'] = True
return super(user_del, self).forward(*keys, **options)
def pre_callback(self, ldap, dn, *keys, **options):
dn = self.obj.get_either_dn(*keys, **options)
@ -846,24 +822,6 @@ class user_show(baseuser_show):
self.obj.get_preserved_attribute(entry_attrs, options)
return dn
def forward(self, *keys, **options):
if 'out' in options:
util.check_writable_file(options['out'])
result = super(user_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
x509.write_certificate_list(
result['result']['usercertificate'],
options['out']
)
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
)
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
else:
return super(user_show, self).forward(*keys, **options)
@register()
class user_undel(LDAPQuery):

View File

@ -17,8 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
from ipalib.frontend import Command, Object
from ipalib import api, errors
from ipalib import Bytes, Flag, Str, StrEnum
@ -975,22 +973,6 @@ class vaultconfig_show(Retrieve):
),
)
def forward(self, *args, **options):
file = options.get('transport_out')
# don't send these parameters to server
if 'transport_out' in options:
del options['transport_out']
response = super(vaultconfig_show, self).forward(*args, **options)
if file:
with open(file, 'w') as f:
f.write(response['result']['transport_cert'])
return response
def execute(self, *args, **options):
if not self.api.Command.kra_is_enabled()['result']: