schema: client-side code cleanup

Move client-side code scattered in global functions into neat classes.

https://fedorahosted.org/freeipa/ticket/4739

Reviewed-By: David Kupka <dkupka@redhat.com>
This commit is contained in:
Jan Cholasta 2016-06-22 13:27:25 +02:00
parent 61987b66ba
commit f7cc15f099

View File

@ -13,8 +13,8 @@ from ipaclient.plugins.rpcclient import rpcclient
from ipalib import parameters, plugable
from ipalib.frontend import Command, Method, Object
from ipalib.output import Output
from ipalib.parameters import Bool, DefaultFrom, Flag, Password, Str
from ipalib.text import ConcatenatedLazyText, _
from ipalib.parameters import DefaultFrom, Flag, Password, Str
from ipalib.text import _
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
@ -48,39 +48,6 @@ _PARAMS = {
class _SchemaCommand(Command):
def __fix_default_from(self, param):
api = self.api
name = unicode(self.name)
param_name = unicode(param.name)
keys = param.default_from.keys
if keys:
def callback(*args):
kw = dict(zip(keys, args))
result = api.Command.command_defaults(
name,
params=[param_name],
kw=kw,
)['result']
return result.get(param_name)
else:
def callback():
result = api.Command.command_defaults(
name,
params=[param_name],
)['result']
return result.get(param_name)
callback.__name__ = '{0}_{1}_default'.format(self.name, param.name)
return param.clone(default_from=DefaultFrom(callback, *keys))
def get_args(self):
for arg in super(_SchemaCommand, self).get_args():
if arg.default_from is not None:
arg = self.__fix_default_from(arg)
yield arg
def get_options(self):
skip = set()
for option in super(_SchemaCommand, self).get_options():
@ -88,12 +55,6 @@ class _SchemaCommand(Command):
continue
if option.name in ('all', 'raw'):
skip.add(option.name)
if option.default_from is not None:
option = self.__fix_default_from(option)
if (isinstance(option, Bool) and
option.autofill and
option.default is False):
option = option.clone_retype(option.name, Flag)
yield option
@ -182,260 +143,247 @@ class _SchemaMethod(Method, _SchemaCommand):
yield output_param
def _nope():
class _SchemaObject(Object):
pass
def _create_param_convert_scalar(cls):
def _convert_scalar(self, value, index=None):
if isinstance(value, unicode):
return value
return super(cls, self)._convert_scalar(value)
class _SchemaPlugin(object):
bases = None
schema_key = None
return _convert_scalar
def _create_param(meta):
type_name = str(meta['type'])
sensitive = meta.get('sensitive', False)
if type_name == 'str' and sensitive:
cls = Password
sensitive = False
else:
try:
cls = _PARAMS[type_name]
except KeyError:
cls = Str
kwargs = {}
default = None
for key, value in meta.items():
if key in ('alwaysask',
'doc',
'label',
'multivalue',
'no_convert',
'option_group',
'required',
'sortorder'):
kwargs[key] = value
elif key in ('cli_metavar',
'cli_name'):
kwargs[key] = str(value)
elif key == 'confirm' and issubclass(cls, parameters.Password):
kwargs[key] = value
elif key == 'default':
default = value
elif key == 'default_from_param':
kwargs['default_from'] = DefaultFrom(_nope,
*(str(k) for k in value))
elif key in ('exclude',
'include'):
kwargs[key] = tuple(str(v) for v in value)
if default is not None:
tmp = cls(str(meta['name']), **dict(kwargs, no_convert=False))
if tmp.multivalue:
default = tuple(tmp._convert_scalar(d) for d in default)
else:
default = tmp._convert_scalar(default[0])
kwargs['default'] = default
if 'default' in kwargs or 'default_from' in kwargs:
kwargs['autofill'] = not kwargs.pop('alwaysask', False)
param = cls(str(meta['name']), **kwargs)
if sensitive:
object.__setattr__(param, 'password', True)
return param
def _create_output(schema):
if schema.get('multivalue', False):
type_type = (tuple, list)
if not schema.get('required', True):
type_type = type_type + (type(None),)
else:
try:
type_type = _TYPES[schema['type']]
except KeyError:
type_type = None
else:
if not schema.get('required', True):
type_type = (type_type, type(None))
kwargs = {}
kwargs['type'] = type_type
if 'doc' in schema:
kwargs['doc'] = schema['doc']
if schema.get('no_display', False):
kwargs['flags'] = ('no_display',)
return Output(str(schema['name']), **kwargs)
def _create_command(schema):
command = {}
command['name'] = str(schema['name'])
if 'doc' in schema:
command['doc'] = ConcatenatedLazyText(schema['doc'])
if 'topic_topic' in schema:
command['topic'] = str(schema['topic_topic'])
else:
command['topic'] = None
if 'obj_class' in schema:
command['obj_name'] = str(schema['obj_class'])
if 'attr_name' in schema:
command['attr_name'] = str(schema['attr_name'])
if 'exclude' in schema and u'cli' in schema['exclude']:
command['NO_CLI'] = True
command['takes_args'] = tuple(
_create_param(s) for s in schema['params']
if s.get('positional', s.get('required', True)))
command['takes_options'] = tuple(
_create_param(s) for s in schema['params']
if not s.get('positional', s.get('required', True)))
command['has_output'] = tuple(
_create_output(m) for m in schema['output'])
return command
def _create_class(schema):
cls = {}
cls['name'] = str(schema['name'])
if 'doc' in schema:
cls['doc'] = ConcatenatedLazyText(schema['doc'])
if 'topic_topic' in schema:
cls['topic'] = str(schema['topic_topic'])
else:
cls['topic'] = None
cls['takes_params'] = tuple(_create_param(s) for s in schema['params'])
return cls
class _LazySchemaPlugin(object):
def __init__(self, base, schema):
self.__base = base
self.__schema = schema
def __init__(self, name):
self.name = name
self.__class = None
self.__module__ = None
@property
def name(self):
return str(self.__schema['name'])
def _create_default_from(self, api, name, keys):
cmd_name = self.name
@property
def bases(self):
if self.__base is Command:
if 'obj_class' in self.__schema:
return (_SchemaMethod,)
else:
return (_SchemaCommand,)
def get_default(*args):
kw = dict(zip(keys, args))
result = api.Command.command_defaults(
unicode(cmd_name),
params=[unicode(name)],
kw=kw,
)['result']
return result.get(name)
if keys:
def callback(*args):
return get_default(*args)
else:
return (self.__base,)
def callback():
return get_default()
callback.__name__ = '{0}_{1}_default'.format(cmd_name, name)
return DefaultFrom(callback, *keys)
def _create_param(self, api, schema):
name = str(schema['name'])
type_name = str(schema['type'])
sensitive = schema.get('sensitive', False)
if type_name == 'str' and sensitive:
cls = Password
sensitive = False
elif (type_name == 'bool' and
'default' in schema and
schema['default'] == [u'False']):
cls = Flag
del schema['default']
else:
try:
cls = _PARAMS[type_name]
except KeyError:
cls = Str
kwargs = {}
default = None
for key, value in schema.items():
if key in ('alwaysask',
'doc',
'label',
'multivalue',
'no_convert',
'option_group',
'required'):
kwargs[key] = value
elif key in ('cli_metavar',
'cli_name'):
kwargs[key] = str(value)
elif key == 'confirm' and issubclass(cls, Password):
kwargs[key] = value
elif key == 'default':
default = value
elif key == 'default_from_param':
keys = tuple(str(k) for k in value)
kwargs['default_from'] = (
self._create_default_from(api, name, keys))
elif key in ('exclude',
'include'):
kwargs[key] = tuple(str(v) for v in value)
if default is not None:
tmp = cls(name, **dict(kwargs, no_convert=False))
if tmp.multivalue:
default = tuple(tmp._convert_scalar(d) for d in default)
else:
default = tmp._convert_scalar(default[0])
kwargs['default'] = default
if 'default' in kwargs or 'default_from' in kwargs:
kwargs['autofill'] = not kwargs.pop('alwaysask', False)
param = cls(name, **kwargs)
if sensitive:
object.__setattr__(param, 'password', True)
return param
def _create_class(self, api, schema):
class_dict = {}
class_dict['name'] = self.name
if 'doc' in schema:
class_dict['doc'] = schema['doc']
if 'topic_topic' in schema:
class_dict['topic'] = str(schema['topic_topic'])
else:
class_dict['topic'] = None
class_dict['takes_params'] = tuple(self._create_param(api, s)
for s in schema.get('params', []))
return self.name, self.bases, class_dict
def __call__(self, api):
if self.__class is None:
if self.__base is Command:
metaobject = _create_command(self.__schema)
else:
metaobject = _create_class(self.__schema)
metaobject = type(self.name, self.bases, metaobject)
metaobject.__module__ = self.__module__
self.__class = metaobject
schema = api._schema[self.schema_key][self.name]
name, bases, class_dict = self._create_class(api, schema)
self.__class = type(name, bases, class_dict)
return self.__class(api)
def _create_commands(schema):
return [_LazySchemaPlugin(Command, s) for s in schema]
class _SchemaCommandPlugin(_SchemaPlugin):
bases = (_SchemaCommand,)
schema_key = 'commands'
def _create_output(self, api, schema):
if schema.get('multivalue', False):
type_type = (tuple, list)
if not schema.get('required', True):
type_type = type_type + (type(None),)
else:
try:
type_type = _TYPES[schema['type']]
except KeyError:
type_type = None
else:
if not schema.get('required', True):
type_type = (type_type, type(None))
kwargs = {}
kwargs['type'] = type_type
if 'doc' in schema:
kwargs['doc'] = schema['doc']
if schema.get('no_display', False):
kwargs['flags'] = ('no_display',)
return Output(str(schema['name']), **kwargs)
def _create_class(self, api, schema):
name, bases, class_dict = (
super(_SchemaCommandPlugin, self)._create_class(api, schema))
if 'obj_class' in schema or 'attr_name' in schema:
bases = (_SchemaMethod,)
if 'obj_class' in schema:
class_dict['obj_name'] = str(schema['obj_class'])
if 'attr_name' in schema:
class_dict['attr_name'] = str(schema['attr_name'])
if 'exclude' in schema and u'cli' in schema['exclude']:
class_dict['NO_CLI'] = True
args = set(str(s['name']) for s in schema['params']
if s.get('positional', s.get('required', True)))
class_dict['takes_args'] = tuple(
p for p in class_dict['takes_params'] if p.name in args)
class_dict['takes_options'] = tuple(
p for p in class_dict['takes_params'] if p.name not in args)
del class_dict['takes_params']
class_dict['has_output'] = tuple(
self._create_output(api, s) for s in schema['output'])
return name, bases, class_dict
def _create_classes(schema):
return [_LazySchemaPlugin(Object, s) for s in schema]
def _create_topic(schema):
topic = {}
topic['name'] = str(schema['name'])
if 'doc' in schema:
topic['doc'] = ConcatenatedLazyText(schema['doc'])
if 'topic_topic' in schema:
topic['topic'] = str(schema['topic_topic'])
else:
topic['topic'] = None
return topic
def _create_topics(schema):
return [_create_topic(s) for s in schema]
class _SchemaObjectPlugin(_SchemaPlugin):
bases = (_SchemaObject,)
schema_key = 'classes'
def get_package(api):
package_name = '{}${}'.format(__name__, id(api))
package_dir = '{}${}'.format(os.path.splitext(__file__)[0], id(api))
try:
schema = api._schema
except AttributeError:
client = rpcclient(api)
client.finalize()
client.connect(verbose=False)
try:
schema = client.forward(u'schema', version=u'2.170')['result']
finally:
client.disconnect()
for key in ('commands', 'classes', 'topics'):
schema[key] = {str(s.pop('name')): s for s in schema[key]}
object.__setattr__(api, '_schema', schema)
fingerprint = str(schema['fingerprint'])
package_name = '{}${}'.format(__name__, fingerprint)
package_dir = '{}${}'.format(os.path.splitext(__file__)[0], fingerprint)
try:
return sys.modules[package_name]
except KeyError:
pass
client = rpcclient(api)
client.finalize()
client.connect(verbose=False)
try:
schema = client.forward(u'schema', version=u'2.170')['result']
finally:
client.disconnect()
commands = _create_commands(schema['commands'])
classes = _create_classes(schema['classes'])
topics = _create_topics(schema['topics'])
package = types.ModuleType(package_name)
package.__file__ = os.path.join(package_dir, '__init__.py')
package.modules = []
package.modules = ['plugins']
sys.modules[package_name] = package
module_name = '.'.join((package_name, 'commands'))
module_name = '.'.join((package_name, 'plugins'))
module = types.ModuleType(module_name)
module.__file__ = os.path.join(package_dir, 'commands.py')
module.__file__ = os.path.join(package_dir, 'plugins.py')
module.register = plugable.Registry()
package.modules.append('commands')
for key, plugin_cls in (('commands', _SchemaCommandPlugin),
('classes', _SchemaObjectPlugin)):
for name in schema[key]:
plugin = plugin_cls(name)
plugin = module.register()(plugin)
setattr(module, name, plugin)
sys.modules[module_name] = module
for command in commands:
command.__module__ = module_name
command = module.register()(command)
setattr(module, command.name, command)
for cls in classes:
cls.__module__ = module_name
cls = module.register()(cls)
setattr(module, cls.name, command)
for topic in topics:
name = topic.pop('name')
for name, topic in six.iteritems(schema['topics']):
module_name = '.'.join((package_name, name))
try:
module = sys.modules[module_name]
except KeyError:
module = sys.modules[module_name] = types.ModuleType(module_name)
module.__file__ = os.path.join(package_dir, '{}.py'.format(name))
module.__dict__.update(topic)
try:
module.__doc__ = module.doc
except AttributeError:
pass
module.__doc__ = topic.get('doc')
if 'topic_topic' in topic:
module.topic = str(topic['topic_topic'])
else:
module.topic = None
return package