mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-28 09:06:44 -06:00
f554078291
Use only object params and params defined in has_output_params as output params. This removes unnecessary duplication of params defined both in object plugins and as command arguments. This requires all command output params to be properly defined in either the object plugins or the command's has_output_params. Fix the plugins where this wasn't true. https://fedorahosted.org/freeipa/ticket/4739 Reviewed-By: David Kupka <dkupka@redhat.com>
667 lines
18 KiB
Python
667 lines
18 KiB
Python
#
|
|
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
|
|
#
|
|
|
|
import importlib
|
|
import itertools
|
|
import sys
|
|
|
|
import six
|
|
|
|
from ipalib import errors
|
|
from ipalib.crud import PKQuery, Retrieve, Search
|
|
from ipalib.frontend import Command, Local, Method, Object
|
|
from ipalib.output import Entry, ListOfEntries, ListOfPrimaryKeys, PrimaryKey
|
|
from ipalib.parameters import Bool, Dict, Flag, Int, Str
|
|
from ipalib.plugable import Registry
|
|
from ipalib.text import _
|
|
from ipapython.version import API_VERSION
|
|
|
|
__doc__ = _("""
|
|
API Schema
|
|
""") + _("""
|
|
Provides API introspection capabilities.
|
|
""") + _("""
|
|
EXAMPLES:
|
|
""") + _("""
|
|
Show user-find details:
|
|
ipa command-show user-find
|
|
""") + _("""
|
|
Find user-find parameters:
|
|
ipa param-find user-find
|
|
""")
|
|
|
|
if six.PY3:
|
|
unicode = str
|
|
|
|
register = Registry()
|
|
|
|
|
|
class BaseMetaObject(Object):
|
|
takes_params = (
|
|
Str(
|
|
'name',
|
|
label=_("Name"),
|
|
primary_key=True,
|
|
normalizer=lambda name: name.replace(u'-', u'_'),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'doc?',
|
|
label=_("Documentation"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, obj, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def _retrieve(self, *args, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def retrieve(self, *args, **kwargs):
|
|
obj = self._retrieve(*args, **kwargs)
|
|
obj = self._get_obj(obj, **kwargs)
|
|
return obj
|
|
|
|
def _search(self, *args, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def _split_search_args(self, criteria=None):
|
|
return [], criteria
|
|
|
|
def search(self, *args, **kwargs):
|
|
args, criteria = self._split_search_args(*args)
|
|
|
|
result = self._search(*args, **kwargs)
|
|
result = (self._get_obj(r, **kwargs) for r in result)
|
|
|
|
if criteria:
|
|
criteria = criteria.lower()
|
|
result = (r for r in result
|
|
if (criteria in r['name'].lower() or
|
|
criteria in r.get('doc', u'').lower()))
|
|
|
|
if not kwargs.get('all', False) and kwargs.get('pkey_only', False):
|
|
result = ({'name': r['name']} for r in result)
|
|
|
|
return result
|
|
|
|
|
|
class BaseMetaRetrieve(Retrieve):
|
|
def execute(self, *args, **options):
|
|
obj = self.obj.retrieve(*args, **options)
|
|
return dict(result=obj, value=args[-1])
|
|
|
|
|
|
class BaseMetaSearch(Search):
|
|
def get_options(self):
|
|
for option in super(BaseMetaSearch, self).get_options():
|
|
yield option
|
|
|
|
yield Flag(
|
|
'pkey_only?',
|
|
label=_("Primary key only"),
|
|
doc=_("Results should contain primary key attribute only "
|
|
"(\"%s\")") % 'name',
|
|
)
|
|
|
|
def execute(self, criteria=None, **options):
|
|
result = list(self.obj.search(criteria, **options))
|
|
return dict(result=result, count=len(result), truncated=False)
|
|
|
|
|
|
class MetaObject(BaseMetaObject):
|
|
takes_params = BaseMetaObject.takes_params + (
|
|
Str(
|
|
'topic_topic?',
|
|
label=_("Help topic"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
|
|
class MetaRetrieve(BaseMetaRetrieve):
|
|
pass
|
|
|
|
|
|
class MetaSearch(BaseMetaSearch):
|
|
pass
|
|
|
|
|
|
@register()
|
|
class command(MetaObject):
|
|
takes_params = BaseMetaObject.takes_params + (
|
|
Str(
|
|
'args_param*',
|
|
label=_("Arguments"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'options_param*',
|
|
label=_("Options"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'output_params_param*',
|
|
label=_("Output parameters"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'no_cli?',
|
|
label=_("Exclude from CLI"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, command, **kwargs):
|
|
obj = dict()
|
|
obj['name'] = unicode(command.name)
|
|
|
|
if command.doc:
|
|
obj['doc'] = unicode(command.doc)
|
|
|
|
if command.topic:
|
|
try:
|
|
topic = self.api.Object.topic.retrieve(unicode(command.topic))
|
|
except errors.NotFound:
|
|
pass
|
|
else:
|
|
obj['topic_topic'] = topic['name']
|
|
|
|
if command.NO_CLI:
|
|
obj['no_cli'] = True
|
|
|
|
if len(command.args):
|
|
obj['args_param'] = tuple(unicode(n) for n in command.args)
|
|
|
|
if len(command.options):
|
|
obj['options_param'] = tuple(
|
|
unicode(n) for n in command.options if n != 'version')
|
|
|
|
if len(command.output_params):
|
|
obj['output_params_param'] = tuple(
|
|
unicode(n) for n in command.output_params)
|
|
|
|
return obj
|
|
|
|
def _retrieve(self, name, **kwargs):
|
|
try:
|
|
command = self.api.Command[name]
|
|
if not isinstance(command, Local):
|
|
return command
|
|
except KeyError:
|
|
pass
|
|
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, **kwargs):
|
|
for command in self.api.Command():
|
|
if not isinstance(command, Local):
|
|
yield command
|
|
|
|
|
|
@register()
|
|
class command_show(MetaRetrieve):
|
|
__doc__ = _("Display information about a command.")
|
|
|
|
|
|
@register()
|
|
class command_find(MetaSearch):
|
|
__doc__ = _("Search for commands.")
|
|
|
|
|
|
@register()
|
|
class command_defaults(PKQuery):
|
|
NO_CLI = True
|
|
|
|
takes_options = (
|
|
Str('params*'),
|
|
Dict('kw?'),
|
|
)
|
|
|
|
def execute(self, name, **options):
|
|
command = self.api.Command[name]
|
|
|
|
params = options.get('params', [])
|
|
kw = options.get('kw', {})
|
|
|
|
result = command.get_default(params, **kw)
|
|
|
|
return dict(result=result)
|
|
|
|
|
|
@register()
|
|
class topic_(MetaObject):
|
|
name = 'topic'
|
|
|
|
def __init__(self, api):
|
|
super(topic_, self).__init__(api)
|
|
self.__topics = None
|
|
|
|
def __get_topics(self):
|
|
if self.__topics is None:
|
|
topics = {}
|
|
object.__setattr__(self, '_topic___topics', topics)
|
|
|
|
for command in self.api.Command():
|
|
topic_value = command.topic
|
|
if topic_value is None:
|
|
continue
|
|
topic_name = unicode(topic_value)
|
|
|
|
while topic_name not in topics:
|
|
topic = topics[topic_name] = {'name': topic_name}
|
|
|
|
for package in self.api.packages:
|
|
module_name = '.'.join((package.__name__, topic_name))
|
|
try:
|
|
module = sys.modules[module_name]
|
|
except KeyError:
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
except ImportError:
|
|
continue
|
|
|
|
if module.__doc__ is not None:
|
|
topic['doc'] = unicode(module.__doc__).strip()
|
|
|
|
try:
|
|
topic_value = module.topic
|
|
except AttributeError:
|
|
continue
|
|
if topic_value is not None:
|
|
topic_name = unicode(topic_value)
|
|
topic['topic_topic'] = topic_name
|
|
else:
|
|
topic.pop('topic_topic', None)
|
|
|
|
return self.__topics
|
|
|
|
def _get_obj(self, topic, **kwargs):
|
|
return topic
|
|
|
|
def _retrieve(self, name, **kwargs):
|
|
try:
|
|
return self.__get_topics()[name]
|
|
except KeyError:
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, **kwargs):
|
|
return self.__get_topics().values()
|
|
|
|
|
|
@register()
|
|
class topic_show(MetaRetrieve):
|
|
__doc__ = _("Display information about a help topic.")
|
|
|
|
|
|
@register()
|
|
class topic_find(MetaSearch):
|
|
__doc__ = _("Search for help topics.")
|
|
|
|
|
|
class BaseParam(BaseMetaObject):
|
|
takes_params = BaseMetaObject.takes_params + (
|
|
Str(
|
|
'type?',
|
|
label=_("Type"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'required?',
|
|
label=_("Required"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'multivalue?',
|
|
label=_("Multi-value"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _split_search_args(self, commandname, criteria=None):
|
|
return [commandname], criteria
|
|
|
|
|
|
class BaseParamMethod(Method):
|
|
def get_args(self):
|
|
parent = self.api.Object.command
|
|
parent_key = parent.primary_key
|
|
yield parent_key.clone_rename(
|
|
parent.name + parent_key.name,
|
|
cli_name=parent.name,
|
|
label=parent_key.label,
|
|
required=True,
|
|
query=True,
|
|
)
|
|
|
|
for arg in super(BaseParamMethod, self).get_args():
|
|
yield arg
|
|
|
|
|
|
class BaseParamRetrieve(BaseParamMethod, BaseMetaRetrieve):
|
|
pass
|
|
|
|
|
|
class BaseParamSearch(BaseParamMethod, BaseMetaSearch):
|
|
pass
|
|
|
|
|
|
@register()
|
|
class param(BaseParam):
|
|
takes_params = BaseParam.takes_params + (
|
|
Bool(
|
|
'alwaysask?',
|
|
label=_("Always ask"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'autofill?',
|
|
label=_("Autofill"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'cli_metavar?',
|
|
label=_("CLI metavar"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'cli_name?',
|
|
label=_("CLI name"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'confirm',
|
|
label=_("Confirm (password)"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'default*',
|
|
label=_("Default"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'default_from_param*',
|
|
label=_("Default from"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'deprecated_cli_aliases*',
|
|
label=_("Deprecated CLI aliases"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'exclude*',
|
|
label=_("Exclude from"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'hint?',
|
|
label=_("Hint"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'include*',
|
|
label=_("Include in"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'label?',
|
|
label=_("Label"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'no_convert?',
|
|
label=_("Convert on server"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'option_group?',
|
|
label=_("Option group"),
|
|
flags={'no_search'},
|
|
),
|
|
Int(
|
|
'sortorder?',
|
|
label=_("Sort order"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'dnsrecord_extra?',
|
|
label=_("Extra field (DNS record)"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'dnsrecord_part?',
|
|
label=_("Part (DNS record)"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'no_option?',
|
|
label=_("No option"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'suppress_empty?',
|
|
label=_("Suppress empty"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'sensitive?',
|
|
label=_("Sensitive"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, param, **kwargs):
|
|
obj = dict()
|
|
obj['name'] = unicode(param.name)
|
|
|
|
if param.type is unicode:
|
|
obj['type'] = u'str'
|
|
elif param.type is bytes:
|
|
obj['type'] = u'bytes'
|
|
elif param.type is not None:
|
|
obj['type'] = unicode(param.type.__name__)
|
|
|
|
if not param.required:
|
|
obj['required'] = False
|
|
if param.multivalue:
|
|
obj['multivalue'] = True
|
|
if param.password:
|
|
obj['sensitive'] = True
|
|
|
|
for key, value in param._Param__clonekw.items():
|
|
if key in ('alwaysask',
|
|
'autofill',
|
|
'confirm',
|
|
'sortorder'):
|
|
obj[key] = value
|
|
elif key in ('cli_metavar',
|
|
'cli_name',
|
|
'doc',
|
|
'hint',
|
|
'label',
|
|
'option_group'):
|
|
obj[key] = unicode(value)
|
|
elif key == 'default':
|
|
if param.multivalue:
|
|
obj[key] = [unicode(v) for v in value]
|
|
else:
|
|
obj[key] = [unicode(value)]
|
|
elif key == 'default_from':
|
|
obj['default_from_param'] = list(unicode(k)
|
|
for k in value.keys)
|
|
elif key in ('deprecated_cli_aliases',
|
|
'exclude',
|
|
'include'):
|
|
obj[key] = list(unicode(v) for v in value)
|
|
elif key in ('exponential',
|
|
'normalizer',
|
|
'only_absolute',
|
|
'precision'):
|
|
obj['no_convert'] = True
|
|
|
|
for flag in (param.flags or []):
|
|
if flag in ('dnsrecord_extra',
|
|
'dnsrecord_part',
|
|
'no_option',
|
|
'suppress_empty'):
|
|
obj[flag] = True
|
|
|
|
return obj
|
|
|
|
def _retrieve(self, commandname, name, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
|
|
if name != 'version':
|
|
try:
|
|
return command.params[name]
|
|
except KeyError:
|
|
try:
|
|
return command.output_params[name]
|
|
except KeyError:
|
|
pass
|
|
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, commandname, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
|
|
result = itertools.chain(
|
|
(p for p in command.params() if p.name != 'version'),
|
|
(p for p in command.output_params()
|
|
if p.name not in command.params))
|
|
|
|
return result
|
|
|
|
|
|
@register()
|
|
class param_show(BaseParamRetrieve):
|
|
__doc__ = _("Display information about a command parameter.")
|
|
|
|
|
|
@register()
|
|
class param_find(BaseParamSearch):
|
|
__doc__ = _("Search command parameters.")
|
|
|
|
|
|
@register()
|
|
class output(BaseParam):
|
|
takes_params = BaseParam.takes_params + (
|
|
Bool(
|
|
'no_display?',
|
|
label=_("Do not display"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, command_output, **kwargs):
|
|
command, output = command_output
|
|
required = True
|
|
multivalue = False
|
|
|
|
if isinstance(output, (Entry, ListOfEntries)):
|
|
type_type = dict
|
|
multivalue = isinstance(output, ListOfEntries)
|
|
elif isinstance(output, (PrimaryKey, ListOfPrimaryKeys)):
|
|
if getattr(command, 'obj', None) and command.obj.primary_key:
|
|
type_type = command.obj.primary_key.type
|
|
else:
|
|
type_type = type(None)
|
|
multivalue = isinstance(output, ListOfPrimaryKeys)
|
|
elif isinstance(output.type, tuple):
|
|
if tuple in output.type or list in output.type:
|
|
type_type = None
|
|
multivalue = True
|
|
else:
|
|
type_type = output.type[0]
|
|
required = type(None) not in output.type
|
|
else:
|
|
type_type = output.type
|
|
|
|
obj = dict()
|
|
obj['name'] = unicode(output.name)
|
|
|
|
if type_type is unicode:
|
|
obj['type'] = u'str'
|
|
elif type_type is bytes:
|
|
obj['type'] = u'bytes'
|
|
elif type_type is not None:
|
|
obj['type'] = unicode(type_type.__name__)
|
|
|
|
if not required:
|
|
obj['required'] = False
|
|
|
|
if multivalue:
|
|
obj['multivalue'] = True
|
|
|
|
if 'doc' in output.__dict__:
|
|
obj['doc'] = unicode(output.doc)
|
|
|
|
if 'flags' in output.__dict__:
|
|
if 'no_display' in output.flags:
|
|
obj['no_display'] = True
|
|
|
|
return obj
|
|
|
|
def _retrieve(self, commandname, name, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
try:
|
|
return (command, command.output[name])
|
|
except KeyError:
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, commandname, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
return ((command, output) for output in command.output())
|
|
|
|
|
|
@register()
|
|
class output_show(BaseParamRetrieve):
|
|
__doc__ = _("Display information about a command output.")
|
|
|
|
|
|
@register()
|
|
class output_find(BaseParamSearch):
|
|
__doc__ = _("Search for command outputs.")
|
|
|
|
|
|
@register()
|
|
class schema(Command):
|
|
NO_CLI = True
|
|
|
|
def execute(self, *args, **kwargs):
|
|
commands = list(self.api.Object.command.search(**kwargs))
|
|
for command in commands:
|
|
name = command['name']
|
|
command['params'] = list(
|
|
self.api.Object.param.search(name, **kwargs))
|
|
command['output'] = list(
|
|
self.api.Object.output.search(name, **kwargs))
|
|
|
|
topics = list(self.api.Object.topic.search(**kwargs))
|
|
|
|
schema = dict()
|
|
schema['version'] = API_VERSION
|
|
schema['commands'] = commands
|
|
schema['topics'] = topics
|
|
|
|
return dict(result=schema)
|