mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-11 16:51:55 -06:00
c5afee964e
Add the 'constructor' type to IPAOption to allow parsing arbitrary types. When using this type, supply the 'constructor' attribute with the constructor of the type. The checker for the 'constructor' type attempts to construct the data, returning if successful else raising OptionValueError. The 'knob' interface remains unchanged but now accepts arbitrary constructors. This feature subsumes the '_option_callback' mechanism, which has been refactored away. This feature also subsumes the "dn" type in IPAOption, but this refactor is deferred. Part of: https://pagure.io/freeipa/issue/6858 Reviewed-By: Florence Blanc-Renaud <frenaud@redhat.com>
357 lines
12 KiB
Python
357 lines
12 KiB
Python
#
|
|
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
|
|
#
|
|
|
|
"""
|
|
Command line support.
|
|
"""
|
|
|
|
import collections
|
|
import enum
|
|
import logging
|
|
import optparse # pylint: disable=deprecated-module
|
|
import signal
|
|
|
|
import six
|
|
|
|
from ipapython import admintool
|
|
from ipapython.ipa_log_manager import standard_logging_setup
|
|
from ipapython.ipautil import CheckedIPAddress, private_ccache
|
|
|
|
from . import core, common
|
|
|
|
__all__ = ['install_tool', 'uninstall_tool']
|
|
|
|
if six.PY3:
|
|
long = int
|
|
|
|
NoneType = type(None)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_usage(configurable_class):
|
|
usage = '%prog [options]'
|
|
|
|
for owner_cls, name in configurable_class.knobs():
|
|
knob_cls = getattr(owner_cls, name)
|
|
if knob_cls.is_cli_positional():
|
|
if knob_cls.cli_metavar is not None:
|
|
metavar = knob_cls.cli_metavar
|
|
elif knob_cls.cli_names:
|
|
metavar = knob_cls.cli_names[0].upper()
|
|
else:
|
|
metavar = name.replace('_', '-').upper()
|
|
|
|
try:
|
|
knob_cls.default
|
|
except AttributeError:
|
|
fmt = ' {}'
|
|
else:
|
|
fmt = ' [{}]'
|
|
|
|
usage += fmt.format(metavar)
|
|
|
|
return usage
|
|
|
|
|
|
def install_tool(configurable_class, command_name, log_file_name,
|
|
debug_option=False, verbose=False, console_format=None,
|
|
use_private_ccache=True, uninstall_log_file_name=None):
|
|
if uninstall_log_file_name is not None:
|
|
uninstall_kwargs = dict(
|
|
configurable_class=configurable_class,
|
|
command_name=command_name,
|
|
log_file_name=uninstall_log_file_name,
|
|
debug_option=debug_option,
|
|
verbose=verbose,
|
|
console_format=console_format,
|
|
)
|
|
else:
|
|
uninstall_kwargs = None
|
|
|
|
return type(
|
|
'install_tool({0})'.format(configurable_class.__name__),
|
|
(InstallTool,),
|
|
dict(
|
|
configurable_class=configurable_class,
|
|
command_name=command_name,
|
|
log_file_name=log_file_name,
|
|
usage=_get_usage(configurable_class),
|
|
debug_option=debug_option,
|
|
verbose=verbose,
|
|
console_format=console_format,
|
|
uninstall_kwargs=uninstall_kwargs,
|
|
use_private_ccache=use_private_ccache,
|
|
)
|
|
)
|
|
|
|
|
|
def uninstall_tool(configurable_class, command_name, log_file_name,
|
|
debug_option=False, verbose=False, console_format=None):
|
|
return type(
|
|
'uninstall_tool({0})'.format(configurable_class.__name__),
|
|
(UninstallTool,),
|
|
dict(
|
|
configurable_class=configurable_class,
|
|
command_name=command_name,
|
|
log_file_name=log_file_name,
|
|
usage=_get_usage(configurable_class),
|
|
debug_option=debug_option,
|
|
verbose=verbose,
|
|
console_format=console_format,
|
|
)
|
|
)
|
|
|
|
|
|
class ConfigureTool(admintool.AdminTool):
|
|
configurable_class = None
|
|
debug_option = False
|
|
verbose = False
|
|
console_format = None
|
|
use_private_ccache = True
|
|
|
|
@staticmethod
|
|
def _transform(configurable_class):
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def add_options(cls, parser, positional=False):
|
|
transformed_cls = cls._transform(cls.configurable_class)
|
|
|
|
if issubclass(transformed_cls, common.Interactive):
|
|
parser.add_option(
|
|
'-U', '--unattended',
|
|
dest='unattended',
|
|
default=False,
|
|
action='store_true',
|
|
help="unattended (un)installation never prompts the user",
|
|
)
|
|
|
|
groups = collections.OrderedDict()
|
|
# if no group is defined, add the option to the parser top level
|
|
groups[None] = parser
|
|
|
|
for owner_cls, name in transformed_cls.knobs():
|
|
knob_cls = getattr(owner_cls, name)
|
|
if knob_cls.is_cli_positional() is not positional:
|
|
continue
|
|
|
|
group_cls = knob_cls.group()
|
|
try:
|
|
opt_group = groups[group_cls]
|
|
except KeyError:
|
|
opt_group = groups[group_cls] = optparse.OptionGroup(
|
|
parser, "{0} options".format(group_cls.description))
|
|
parser.add_option_group(opt_group)
|
|
|
|
knob_type = knob_cls.type
|
|
if issubclass(knob_type, list):
|
|
try:
|
|
# typing.List[X].__parameters__ == (X,)
|
|
knob_scalar_type = knob_type.__parameters__[0]
|
|
except AttributeError:
|
|
knob_scalar_type = str
|
|
else:
|
|
knob_scalar_type = knob_type
|
|
|
|
kwargs = dict()
|
|
if knob_scalar_type is NoneType:
|
|
kwargs['type'] = None
|
|
kwargs['const'] = True
|
|
kwargs['default'] = False
|
|
elif knob_scalar_type is str:
|
|
kwargs['type'] = 'string'
|
|
elif knob_scalar_type is int:
|
|
kwargs['type'] = 'int'
|
|
elif knob_scalar_type is long:
|
|
kwargs['type'] = 'long'
|
|
elif knob_scalar_type is CheckedIPAddress:
|
|
kwargs['type'] = 'ip'
|
|
elif issubclass(knob_scalar_type, enum.Enum):
|
|
kwargs['type'] = 'choice'
|
|
kwargs['choices'] = [i.value for i in knob_scalar_type]
|
|
kwargs['metavar'] = "{{{0}}}".format(
|
|
",".join(kwargs['choices']))
|
|
else:
|
|
kwargs['type'] = 'constructor'
|
|
kwargs['constructor'] = knob_scalar_type
|
|
kwargs['dest'] = name
|
|
if issubclass(knob_type, list):
|
|
if kwargs['type'] is None:
|
|
kwargs['action'] = 'append_const'
|
|
else:
|
|
kwargs['action'] = 'append'
|
|
else:
|
|
if kwargs['type'] is None:
|
|
kwargs['action'] = 'store_const'
|
|
else:
|
|
kwargs['action'] = 'store'
|
|
if knob_cls.sensitive:
|
|
kwargs['sensitive'] = True
|
|
if knob_cls.cli_metavar:
|
|
kwargs['metavar'] = knob_cls.cli_metavar
|
|
|
|
if not positional:
|
|
cli_info = (
|
|
(knob_cls.deprecated, knob_cls.cli_names),
|
|
(True, knob_cls.cli_deprecated_names),
|
|
)
|
|
else:
|
|
cli_info = (
|
|
(knob_cls.deprecated, (None,)),
|
|
)
|
|
for hidden, cli_names in cli_info:
|
|
opt_strs = []
|
|
for cli_name in cli_names:
|
|
if cli_name is None:
|
|
cli_name = '--{}'.format(name.replace('_', '-'))
|
|
opt_strs.append(cli_name)
|
|
if not opt_strs:
|
|
continue
|
|
|
|
if not hidden:
|
|
help = knob_cls.description
|
|
else:
|
|
help = optparse.SUPPRESS_HELP
|
|
|
|
opt_group.add_option(
|
|
*opt_strs,
|
|
help=help,
|
|
**kwargs
|
|
)
|
|
|
|
super(ConfigureTool, cls).add_options(parser,
|
|
debug_option=cls.debug_option)
|
|
|
|
def __init__(self, options, args):
|
|
super(ConfigureTool, self).__init__(options, args)
|
|
|
|
self.transformed_cls = self._transform(self.configurable_class)
|
|
self.positional_arguments = []
|
|
|
|
for owner_cls, name in self.transformed_cls.knobs():
|
|
knob_cls = getattr(owner_cls, name)
|
|
if knob_cls.is_cli_positional():
|
|
self.positional_arguments.append(name)
|
|
|
|
# fake option parser to parse positional arguments
|
|
# (because optparse does not support positional argument parsing)
|
|
fake_option_parser = optparse.OptionParser()
|
|
self.add_options(fake_option_parser, True)
|
|
|
|
fake_option_map = {option.dest: option
|
|
for group in fake_option_parser.option_groups
|
|
for option in group.option_list}
|
|
|
|
for index, name in enumerate(self.positional_arguments):
|
|
try:
|
|
value = self.args.pop(0)
|
|
except IndexError:
|
|
break
|
|
|
|
fake_option = fake_option_map[name]
|
|
fake_option.process('argument {}'.format(index + 1),
|
|
value,
|
|
self.options,
|
|
self.option_parser)
|
|
|
|
def validate_options(self, needs_root=True):
|
|
super(ConfigureTool, self).validate_options(needs_root=needs_root)
|
|
|
|
if self.args:
|
|
self.option_parser.error("Too many arguments provided")
|
|
|
|
def _setup_logging(self, log_file_mode='w', no_file=False):
|
|
if no_file:
|
|
log_file_name = None
|
|
elif self.options.log_file:
|
|
log_file_name = self.options.log_file
|
|
else:
|
|
log_file_name = self.log_file_name
|
|
standard_logging_setup(
|
|
log_file_name,
|
|
verbose=self.verbose,
|
|
debug=self.options.verbose,
|
|
console_format=self.console_format)
|
|
if log_file_name:
|
|
logger.debug('Logging to %s', log_file_name)
|
|
elif not no_file:
|
|
logger.debug('Not logging to a file')
|
|
|
|
def run(self):
|
|
kwargs = {}
|
|
|
|
transformed_cls = self._transform(self.configurable_class)
|
|
knob_classes = {n: getattr(c, n) for c, n in transformed_cls.knobs()}
|
|
for name in knob_classes:
|
|
value = getattr(self.options, name, None)
|
|
if value is not None:
|
|
kwargs[name] = value
|
|
|
|
if (issubclass(self.configurable_class, common.Interactive) and
|
|
not self.options.unattended):
|
|
kwargs['interactive'] = True
|
|
|
|
try:
|
|
cfgr = transformed_cls(**kwargs)
|
|
except core.KnobValueError as e:
|
|
knob_cls = knob_classes[e.name]
|
|
try:
|
|
index = self.positional_arguments.index(e.name)
|
|
except ValueError:
|
|
cli_name = knob_cls.cli_names[0] or e.name.replace('_', '-')
|
|
desc = "option {0}".format(cli_name)
|
|
else:
|
|
desc = "argument {0}".format(index + 1)
|
|
self.option_parser.error("{0}: {1}".format(desc, e))
|
|
except RuntimeError as e:
|
|
self.option_parser.error(str(e))
|
|
|
|
signal.signal(signal.SIGTERM, self.__signal_handler)
|
|
|
|
if self.use_private_ccache:
|
|
with private_ccache():
|
|
super(ConfigureTool, self).run()
|
|
cfgr.run()
|
|
else:
|
|
super(ConfigureTool, self).run()
|
|
cfgr.run()
|
|
|
|
@staticmethod
|
|
def __signal_handler(signum, frame):
|
|
raise KeyboardInterrupt
|
|
|
|
|
|
class InstallTool(ConfigureTool):
|
|
uninstall_kwargs = None
|
|
|
|
_transform = staticmethod(common.installer)
|
|
|
|
@classmethod
|
|
def add_options(cls, parser, positional=False):
|
|
super(InstallTool, cls).add_options(parser, positional)
|
|
|
|
if cls.uninstall_kwargs is not None:
|
|
parser.add_option(
|
|
'--uninstall',
|
|
dest='uninstall',
|
|
default=False,
|
|
action='store_true',
|
|
help=("uninstall an existing installation. The uninstall can "
|
|
"be run with --unattended option"),
|
|
)
|
|
|
|
@classmethod
|
|
def get_command_class(cls, options, args):
|
|
if cls.uninstall_kwargs is not None and options.uninstall:
|
|
uninstall_cls = uninstall_tool(**cls.uninstall_kwargs)
|
|
uninstall_cls.option_parser = cls.option_parser
|
|
return uninstall_cls
|
|
else:
|
|
return super(InstallTool, cls).get_command_class(options, args)
|
|
|
|
|
|
class UninstallTool(ConfigureTool):
|
|
_transform = staticmethod(common.uninstaller)
|