mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-12 17:21:55 -06:00
a64aba36a4
Commands inherited from Local can't be executed remotely, so exclude them from API schema. https://fedorahosted.org/freeipa/ticket/4739 Reviewed-By: David Kupka <dkupka@redhat.com>
668 lines
18 KiB
Python
668 lines
18 KiB
Python
#
|
|
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
|
|
#
|
|
|
|
import importlib
|
|
import itertools
|
|
import sys
|
|
|
|
import six
|
|
|
|
from ipalib import errors
|
|
from ipalib.crud import PKQuery, Retrieve, Search
|
|
from ipalib.frontend import Command, Local, Method, Object
|
|
from ipalib.output import Entry, ListOfEntries, ListOfPrimaryKeys, PrimaryKey
|
|
from ipalib.parameters import Bool, Dict, Flag, Int, Str
|
|
from ipalib.plugable import Registry
|
|
from ipalib.text import _
|
|
from ipapython.version import API_VERSION
|
|
|
|
__doc__ = _("""
|
|
API Schema
|
|
""") + _("""
|
|
Provides API introspection capabilities.
|
|
""") + _("""
|
|
EXAMPLES:
|
|
""") + _("""
|
|
Show user-find details:
|
|
ipa command-show user-find
|
|
""") + _("""
|
|
Find user-find parameters:
|
|
ipa param-find user-find
|
|
""")
|
|
|
|
if six.PY3:
|
|
unicode = str
|
|
|
|
register = Registry()
|
|
|
|
|
|
class BaseMetaObject(Object):
|
|
takes_params = (
|
|
Str(
|
|
'name',
|
|
label=_("Name"),
|
|
primary_key=True,
|
|
normalizer=lambda name: name.replace(u'-', u'_'),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'doc?',
|
|
label=_("Documentation"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, obj, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def _retrieve(self, *args, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def retrieve(self, *args, **kwargs):
|
|
obj = self._retrieve(*args, **kwargs)
|
|
obj = self._get_obj(obj, **kwargs)
|
|
return obj
|
|
|
|
def _search(self, *args, **kwargs):
|
|
raise NotImplementedError()
|
|
|
|
def _split_search_args(self, criteria=None):
|
|
return [], criteria
|
|
|
|
def search(self, *args, **kwargs):
|
|
args, criteria = self._split_search_args(*args)
|
|
|
|
result = self._search(*args, **kwargs)
|
|
result = (self._get_obj(r, **kwargs) for r in result)
|
|
|
|
if criteria:
|
|
criteria = criteria.lower()
|
|
result = (r for r in result
|
|
if (criteria in r['name'].lower() or
|
|
criteria in r.get('doc', u'').lower()))
|
|
|
|
if not kwargs.get('all', False) and kwargs.get('pkey_only', False):
|
|
result = ({'name': r['name']} for r in result)
|
|
|
|
return result
|
|
|
|
|
|
class BaseMetaRetrieve(Retrieve):
|
|
def execute(self, *args, **options):
|
|
obj = self.obj.retrieve(*args, **options)
|
|
return dict(result=obj, value=args[-1])
|
|
|
|
|
|
class BaseMetaSearch(Search):
|
|
def get_options(self):
|
|
for option in super(BaseMetaSearch, self).get_options():
|
|
yield option
|
|
|
|
yield Flag(
|
|
'pkey_only?',
|
|
label=_("Primary key only"),
|
|
doc=_("Results should contain primary key attribute only "
|
|
"(\"%s\")") % 'name',
|
|
)
|
|
|
|
def execute(self, criteria=None, **options):
|
|
result = list(self.obj.search(criteria, **options))
|
|
return dict(result=result, count=len(result), truncated=False)
|
|
|
|
|
|
class MetaObject(BaseMetaObject):
|
|
takes_params = BaseMetaObject.takes_params + (
|
|
Str(
|
|
'topic_topic?',
|
|
label=_("Help topic"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
|
|
class MetaRetrieve(BaseMetaRetrieve):
|
|
pass
|
|
|
|
|
|
class MetaSearch(BaseMetaSearch):
|
|
pass
|
|
|
|
|
|
@register()
|
|
class command(MetaObject):
|
|
takes_params = BaseMetaObject.takes_params + (
|
|
Str(
|
|
'args_param*',
|
|
label=_("Arguments"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'options_param*',
|
|
label=_("Options"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'output_params_param*',
|
|
label=_("Output parameters"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'no_cli?',
|
|
label=_("Exclude from CLI"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, command, **kwargs):
|
|
obj = dict()
|
|
obj['name'] = unicode(command.name)
|
|
|
|
if command.doc:
|
|
obj['doc'] = unicode(command.doc)
|
|
|
|
if command.topic:
|
|
try:
|
|
topic = self.api.Object.topic.retrieve(unicode(command.topic))
|
|
except errors.NotFound:
|
|
pass
|
|
else:
|
|
obj['topic_topic'] = topic['name']
|
|
|
|
if command.NO_CLI:
|
|
obj['no_cli'] = True
|
|
|
|
if len(command.args):
|
|
obj['args_param'] = tuple(unicode(n) for n in command.args)
|
|
|
|
if len(command.options):
|
|
obj['options_param'] = tuple(
|
|
unicode(n) for n in command.options if n != 'version')
|
|
|
|
if len(command.output_params):
|
|
obj['output_params_param'] = tuple(
|
|
unicode(n) for n in command.output_params
|
|
if n not in command.params)
|
|
|
|
return obj
|
|
|
|
def _retrieve(self, name, **kwargs):
|
|
try:
|
|
command = self.api.Command[name]
|
|
if not isinstance(command, Local):
|
|
return command
|
|
except KeyError:
|
|
pass
|
|
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, **kwargs):
|
|
for command in self.api.Command():
|
|
if not isinstance(command, Local):
|
|
yield command
|
|
|
|
|
|
@register()
|
|
class command_show(MetaRetrieve):
|
|
__doc__ = _("Display information about a command.")
|
|
|
|
|
|
@register()
|
|
class command_find(MetaSearch):
|
|
__doc__ = _("Search for commands.")
|
|
|
|
|
|
@register()
|
|
class command_defaults(PKQuery):
|
|
NO_CLI = True
|
|
|
|
takes_options = (
|
|
Str('params*'),
|
|
Dict('kw?'),
|
|
)
|
|
|
|
def execute(self, name, **options):
|
|
command = self.api.Command[name]
|
|
|
|
params = options.get('params', [])
|
|
kw = options.get('kw', {})
|
|
|
|
result = command.get_default(params, **kw)
|
|
|
|
return dict(result=result)
|
|
|
|
|
|
@register()
|
|
class topic_(MetaObject):
|
|
name = 'topic'
|
|
|
|
def __init__(self, api):
|
|
super(topic_, self).__init__(api)
|
|
self.__topics = None
|
|
|
|
def __get_topics(self):
|
|
if self.__topics is None:
|
|
topics = {}
|
|
object.__setattr__(self, '_topic___topics', topics)
|
|
|
|
for command in self.api.Command():
|
|
topic_value = command.topic
|
|
if topic_value is None:
|
|
continue
|
|
topic_name = unicode(topic_value)
|
|
|
|
while topic_name not in topics:
|
|
topic = topics[topic_name] = {'name': topic_name}
|
|
|
|
for package in self.api.packages:
|
|
module_name = '.'.join((package.__name__, topic_name))
|
|
try:
|
|
module = sys.modules[module_name]
|
|
except KeyError:
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
except ImportError:
|
|
continue
|
|
|
|
if module.__doc__ is not None:
|
|
topic['doc'] = unicode(module.__doc__).strip()
|
|
|
|
try:
|
|
topic_value = module.topic
|
|
except AttributeError:
|
|
continue
|
|
if topic_value is not None:
|
|
topic_name = unicode(topic_value)
|
|
topic['topic_topic'] = topic_name
|
|
else:
|
|
topic.pop('topic_topic', None)
|
|
|
|
return self.__topics
|
|
|
|
def _get_obj(self, topic, **kwargs):
|
|
return topic
|
|
|
|
def _retrieve(self, name, **kwargs):
|
|
try:
|
|
return self.__get_topics()[name]
|
|
except KeyError:
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, **kwargs):
|
|
return self.__get_topics().values()
|
|
|
|
|
|
@register()
|
|
class topic_show(MetaRetrieve):
|
|
__doc__ = _("Display information about a help topic.")
|
|
|
|
|
|
@register()
|
|
class topic_find(MetaSearch):
|
|
__doc__ = _("Search for help topics.")
|
|
|
|
|
|
class BaseParam(BaseMetaObject):
|
|
takes_params = BaseMetaObject.takes_params + (
|
|
Str(
|
|
'type?',
|
|
label=_("Type"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'required?',
|
|
label=_("Required"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'multivalue?',
|
|
label=_("Multi-value"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _split_search_args(self, commandname, criteria=None):
|
|
return [commandname], criteria
|
|
|
|
|
|
class BaseParamMethod(Method):
|
|
def get_args(self):
|
|
parent = self.api.Object.command
|
|
parent_key = parent.primary_key
|
|
yield parent_key.clone_rename(
|
|
parent.name + parent_key.name,
|
|
cli_name=parent.name,
|
|
label=parent_key.label,
|
|
required=True,
|
|
query=True,
|
|
)
|
|
|
|
for arg in super(BaseParamMethod, self).get_args():
|
|
yield arg
|
|
|
|
|
|
class BaseParamRetrieve(BaseParamMethod, BaseMetaRetrieve):
|
|
pass
|
|
|
|
|
|
class BaseParamSearch(BaseParamMethod, BaseMetaSearch):
|
|
pass
|
|
|
|
|
|
@register()
|
|
class param(BaseParam):
|
|
takes_params = BaseParam.takes_params + (
|
|
Bool(
|
|
'alwaysask?',
|
|
label=_("Always ask"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'autofill?',
|
|
label=_("Autofill"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'cli_metavar?',
|
|
label=_("CLI metavar"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'cli_name?',
|
|
label=_("CLI name"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'confirm',
|
|
label=_("Confirm (password)"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'default*',
|
|
label=_("Default"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'default_from_param*',
|
|
label=_("Default from"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'deprecated_cli_aliases*',
|
|
label=_("Deprecated CLI aliases"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'exclude*',
|
|
label=_("Exclude from"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'hint?',
|
|
label=_("Hint"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'include*',
|
|
label=_("Include in"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'label?',
|
|
label=_("Label"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'no_convert?',
|
|
label=_("Convert on server"),
|
|
flags={'no_search'},
|
|
),
|
|
Str(
|
|
'option_group?',
|
|
label=_("Option group"),
|
|
flags={'no_search'},
|
|
),
|
|
Int(
|
|
'sortorder?',
|
|
label=_("Sort order"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'dnsrecord_extra?',
|
|
label=_("Extra field (DNS record)"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'dnsrecord_part?',
|
|
label=_("Part (DNS record)"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'no_option?',
|
|
label=_("No option"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'suppress_empty?',
|
|
label=_("Suppress empty"),
|
|
flags={'no_search'},
|
|
),
|
|
Bool(
|
|
'sensitive?',
|
|
label=_("Sensitive"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, param, **kwargs):
|
|
obj = dict()
|
|
obj['name'] = unicode(param.name)
|
|
|
|
if param.type is unicode:
|
|
obj['type'] = u'str'
|
|
elif param.type is bytes:
|
|
obj['type'] = u'bytes'
|
|
elif param.type is not None:
|
|
obj['type'] = unicode(param.type.__name__)
|
|
|
|
if not param.required:
|
|
obj['required'] = False
|
|
if param.multivalue:
|
|
obj['multivalue'] = True
|
|
if param.password:
|
|
obj['sensitive'] = True
|
|
|
|
for key, value in param._Param__clonekw.items():
|
|
if key in ('alwaysask',
|
|
'autofill',
|
|
'confirm',
|
|
'sortorder'):
|
|
obj[key] = value
|
|
elif key in ('cli_metavar',
|
|
'cli_name',
|
|
'doc',
|
|
'hint',
|
|
'label',
|
|
'option_group'):
|
|
obj[key] = unicode(value)
|
|
elif key == 'default':
|
|
if param.multivalue:
|
|
obj[key] = [unicode(v) for v in value]
|
|
else:
|
|
obj[key] = [unicode(value)]
|
|
elif key == 'default_from':
|
|
obj['default_from_param'] = list(unicode(k)
|
|
for k in value.keys)
|
|
elif key in ('deprecated_cli_aliases',
|
|
'exclude',
|
|
'include'):
|
|
obj[key] = list(unicode(v) for v in value)
|
|
elif key in ('exponential',
|
|
'normalizer',
|
|
'only_absolute',
|
|
'precision'):
|
|
obj['no_convert'] = True
|
|
|
|
for flag in (param.flags or []):
|
|
if flag in ('dnsrecord_extra',
|
|
'dnsrecord_part',
|
|
'no_option',
|
|
'suppress_empty'):
|
|
obj[flag] = True
|
|
|
|
return obj
|
|
|
|
def _retrieve(self, commandname, name, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
|
|
if name != 'version':
|
|
try:
|
|
return command.params[name]
|
|
except KeyError:
|
|
try:
|
|
return command.output_params[name]
|
|
except KeyError:
|
|
pass
|
|
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, commandname, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
|
|
result = itertools.chain(
|
|
(p for p in command.params() if p.name != 'version'),
|
|
(p for p in command.output_params()
|
|
if p.name not in command.params))
|
|
|
|
return result
|
|
|
|
|
|
@register()
|
|
class param_show(BaseParamRetrieve):
|
|
__doc__ = _("Display information about a command parameter.")
|
|
|
|
|
|
@register()
|
|
class param_find(BaseParamSearch):
|
|
__doc__ = _("Search command parameters.")
|
|
|
|
|
|
@register()
|
|
class output(BaseParam):
|
|
takes_params = BaseParam.takes_params + (
|
|
Bool(
|
|
'no_display?',
|
|
label=_("Do not display"),
|
|
flags={'no_search'},
|
|
),
|
|
)
|
|
|
|
def _get_obj(self, command_output, **kwargs):
|
|
command, output = command_output
|
|
required = True
|
|
multivalue = False
|
|
|
|
if isinstance(output, (Entry, ListOfEntries)):
|
|
type_type = dict
|
|
multivalue = isinstance(output, ListOfEntries)
|
|
elif isinstance(output, (PrimaryKey, ListOfPrimaryKeys)):
|
|
if getattr(command, 'obj', None) and command.obj.primary_key:
|
|
type_type = command.obj.primary_key.type
|
|
else:
|
|
type_type = type(None)
|
|
multivalue = isinstance(output, ListOfPrimaryKeys)
|
|
elif isinstance(output.type, tuple):
|
|
if tuple in output.type or list in output.type:
|
|
type_type = None
|
|
multivalue = True
|
|
else:
|
|
type_type = output.type[0]
|
|
required = type(None) not in output.type
|
|
else:
|
|
type_type = output.type
|
|
|
|
obj = dict()
|
|
obj['name'] = unicode(output.name)
|
|
|
|
if type_type is unicode:
|
|
obj['type'] = u'str'
|
|
elif type_type is bytes:
|
|
obj['type'] = u'bytes'
|
|
elif type_type is not None:
|
|
obj['type'] = unicode(type_type.__name__)
|
|
|
|
if not required:
|
|
obj['required'] = False
|
|
|
|
if multivalue:
|
|
obj['multivalue'] = True
|
|
|
|
if 'doc' in output.__dict__:
|
|
obj['doc'] = unicode(output.doc)
|
|
|
|
if 'flags' in output.__dict__:
|
|
if 'no_display' in output.flags:
|
|
obj['no_display'] = True
|
|
|
|
return obj
|
|
|
|
def _retrieve(self, commandname, name, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
try:
|
|
return (command, command.output[name])
|
|
except KeyError:
|
|
raise errors.NotFound(
|
|
reason=_("%(pkey)s: %(oname)s not found") % {
|
|
'pkey': name, 'oname': self.name,
|
|
}
|
|
)
|
|
|
|
def _search(self, commandname, **kwargs):
|
|
command = self.api.Command[commandname]
|
|
return ((command, output) for output in command.output())
|
|
|
|
|
|
@register()
|
|
class output_show(BaseParamRetrieve):
|
|
__doc__ = _("Display information about a command output.")
|
|
|
|
|
|
@register()
|
|
class output_find(BaseParamSearch):
|
|
__doc__ = _("Search for command outputs.")
|
|
|
|
|
|
@register()
|
|
class schema(Command):
|
|
NO_CLI = True
|
|
|
|
def execute(self, *args, **kwargs):
|
|
commands = list(self.api.Object.command.search(**kwargs))
|
|
for command in commands:
|
|
name = command['name']
|
|
command['params'] = list(
|
|
self.api.Object.param.search(name, **kwargs))
|
|
command['output'] = list(
|
|
self.api.Object.output.search(name, **kwargs))
|
|
|
|
topics = list(self.api.Object.topic.search(**kwargs))
|
|
|
|
schema = dict()
|
|
schema['version'] = API_VERSION
|
|
schema['commands'] = commands
|
|
schema['topics'] = topics
|
|
|
|
return dict(result=schema)
|