Finished reworked cli.CLI class into cli.cli plugin

This commit is contained in:
Jason Gerard DeRose 2009-01-28 13:05:26 -07:00 committed by Rob Crittenden
parent db0168f7af
commit 231f0bd65a
14 changed files with 77 additions and 545 deletions

View File

@ -701,7 +701,7 @@ plugin (or plugins) is imported. For example:
1
>>> api.bootstrap(in_server=True) # We want to execute, not forward
>>> len(api.env)
36
38
`Env._bootstrap()`, which is called by `API.bootstrap()`, will create several
run-time variables that connot be overriden in configuration files or through

View File

@ -413,9 +413,9 @@ class help(frontend.Command):
name = from_cli(key)
if name not in self.Command:
raise HelpError(topic=key)
cmd = self.application[key]
cmd = self.Command[name]
print 'Purpose: %s' % cmd.doc
self.application.build_parser(cmd).print_help()
self.Backend.cli.build_parser(cmd).print_help()
def print_commands(self):
mcl = self.get_mcl()
@ -503,331 +503,23 @@ cli_application_commands = (
class Collector(object):
def __init__(self, **extra):
def __init__(self):
object.__setattr__(self, '_Collector__options', {})
object.__setattr__(self, '_Collector__extra', frozenset(extra))
for (key, value) in extra.iteritems():
object.__setattr__(self, key, value)
def __setattr__(self, name, value):
if name not in self.__extra:
if name in self.__options:
v = self.__options[name]
if type(v) is tuple:
value = v + (value,)
else:
value = (v, value)
self.__options[name] = value
if name in self.__options:
v = self.__options[name]
if type(v) is tuple:
value = v + (value,)
else:
value = (v, value)
self.__options[name] = value
object.__setattr__(self, name, value)
def __todict__(self):
return dict(self.__options)
class CLI(object):
"""
All logic for dispatching over command line interface.
"""
__d = None
__mcl = None
def __init__(self, api, argv):
self.api = api
self.argv = tuple(argv)
self.__done = set()
def run(self):
"""
Call `CLI.run_real` in a try/except.
"""
self.bootstrap()
try:
self.run_real()
except KeyboardInterrupt:
print ''
self.api.log.info('operation aborted')
sys.exit()
except PublicError, e:
self.api.log.error(e.strerror)
sys.exit(e.errno)
except Exception, e:
self.api.log.exception('%s: %s', e.__class__.__name__, str(e))
e = InternalError()
self.api.log.error(e.strerror)
sys.exit(e.errno)
def run_real(self):
"""
Parse ``argv`` and potentially run a command.
This method requires several initialization steps to be completed
first, all of which all automatically called with a single call to
`CLI.finalize()`. The initialization steps are broken into separate
methods simply to make it easy to write unit tests.
The initialization involves these steps:
1. `CLI.parse_globals` parses the global options, which get stored
in ``CLI.options``, and stores the remaining args in
``CLI.cmd_argv``.
2. `CLI.bootstrap` initializes the environment information in
``CLI.api.env``.
3. `CLI.load_plugins` registers all plugins, including the
CLI-specific plugins.
4. `CLI.finalize` instantiates all plugins and performs the
remaining initialization needed to use the `plugable.API`
instance.
"""
self.__doing('run_real')
self.finalize()
if self.api.env.mode == 'unit_test':
return
if len(self.cmd_argv) < 1:
self.api.Command.help()
return
key = self.cmd_argv[0]
if key not in self:
raise CommandError(name=key)
self.run_cmd(self[key])
def finalize(self):
"""
Fully initialize ``CLI.api`` `plugable.API` instance.
This method first calls `CLI.load_plugins` to perform some dependant
initialization steps, after which `plugable.API.finalize` is called.
Finally, the CLI-specific commands are passed a reference to this
`CLI` instance by calling `frontend.Application.set_application`.
"""
self.__doing('finalize')
self.load_plugins()
self.api.finalize()
for a in self.api.Application():
a.set_application(self)
assert self.__d is None
self.__d = dict(
(c.name.replace('_', '-'), c) for c in self.api.Command()
)
self.textui = self.api.Backend.textui
if self.api.env.in_server is False and 'xmlclient' in self.api.Backend:
self.api.Backend.xmlclient.connect()
def load_plugins(self):
"""
Load all standard plugins plus the CLI-specific plugins.
This method first calls `CLI.bootstrap` to preform some dependant
initialization steps, after which `plugable.API.load_plugins` is
called.
Finally, all the CLI-specific plugins are registered.
"""
self.__doing('load_plugins')
if 'bootstrap' not in self.__done:
self.bootstrap()
self.api.load_plugins()
for klass in cli_application_commands:
self.api.register(klass)
self.api.register(textui)
def bootstrap(self):
"""
Initialize the ``CLI.api.env`` environment variables.
This method first calls `CLI.parse_globals` to perform some dependant
initialization steps. Then, using environment variables that may have
been passed in the global options, the ``overrides`` are constructed
and `plugable.API.bootstrap` is called.
"""
self.__doing('bootstrap')
self.parse_globals()
self.api.bootstrap_with_global_options(self.options, context='cli')
def parse_globals(self):
"""
Parse out the global options.
This method parses the global options out of the ``CLI.argv`` instance
attribute, after which two new instance attributes are available:
1. ``CLI.options`` - an ``optparse.Values`` instance containing
the global options.
2. ``CLI.cmd_argv`` - a tuple containing the remainder of
``CLI.argv`` after the global options have been consumed.
The common global options are added using the
`util.add_global_options` function.
"""
self.__doing('parse_globals')
parser = optparse.OptionParser()
parser.disable_interspersed_args()
parser.add_option('-a', dest='prompt_all', action='store_true',
help='Prompt for all missing options interactively')
parser.add_option('-n', dest='interactive', action='store_false',
help='Don\'t prompt for any options interactively')
parser.set_defaults(
prompt_all=False,
interactive=True,
)
util.add_global_options(parser)
(options, args) = parser.parse_args(list(self.argv))
self.options = options
self.cmd_argv = tuple(args)
def __doing(self, name):
if name in self.__done:
raise StandardError(
'%s.%s() already called' % (self.__class__.__name__, name)
)
self.__done.add(name)
def run_cmd(self, cmd):
kw = self.parse(cmd)
if self.options.interactive:
self.prompt_interactively(cmd, kw)
self.prompt_for_passwords(cmd, kw)
result = cmd(**kw)
if callable(cmd.output_for_cli):
for param in cmd.params():
if param.password and param.name in kw:
del kw[param.name]
(args, options) = cmd.params_2_args_options(**kw)
cmd.output_for_cli(self.api.Backend.textui, result, *args, **options)
def prompt_for_passwords(self, cmd, kw):
for param in cmd.params():
if not param.password:
continue
if kw.get(param.name, False) is True or param.name in cmd.args:
kw[param.name] = self.textui.prompt_password(
param.cli_name
)
else:
kw.pop(param.name, None)
return kw
def prompt_interactively(self, cmd, kw):
"""
Interactively prompt for missing or invalid values.
By default this method will only prompt for *required* Param that
have a missing or invalid value. However, if
``CLI.options.prompt_all`` is True, this method will prompt for any
params that have a missing or required values, even if the param is
optional.
"""
for param in cmd.params():
if param.password or param.autofill:
continue
elif param.name not in kw:
if not param.required and not self.options.prompt_all:
continue
default = param.get_default(**kw)
error = None
while True:
if error is not None:
print '>>> %s: %s' % (param.cli_name, error)
raw = self.textui.prompt(param.cli_name, default)
try:
value = param(raw, **kw)
if value is not None:
kw[param.name] = value
break
except errors.ValidationError, e:
error = e.error
return kw
def parse(self, cmd):
parser = self.build_parser(cmd)
(kwc, args) = parser.parse_args(
list(self.cmd_argv[1:]), KWCollector()
)
options = kwc.__todict__()
kw = cmd.args_options_2_params(*args, **options)
return dict(self.parse_iter(cmd, kw))
def parse_iter(self, cmd, kw):
"""
Decode param values if appropriate.
"""
for (key, value) in kw.iteritems():
param = cmd.params[key]
if isinstance(param, Bytes):
yield (key, value)
else:
yield (key, self.textui.decode(value))
def build_parser(self, cmd):
parser = optparse.OptionParser(
usage=self.get_usage(cmd),
)
for option in cmd.options():
kw = dict(
dest=option.name,
help=option.doc,
)
if option.password:
kw['action'] = 'store_true'
elif option.type is bool:
if option.default is True:
kw['action'] = 'store_false'
else:
kw['action'] = 'store_true'
else:
kw['metavar'] = metavar=option.__class__.__name__.upper()
o = optparse.make_option('--%s' % to_cli(option.cli_name), **kw)
parser.add_option(o)
return parser
def get_usage(self, cmd):
return ' '.join(self.get_usage_iter(cmd))
def get_usage_iter(self, cmd):
yield 'Usage: %%prog [global-options] %s' % to_cli(cmd.name)
for arg in cmd.args():
if arg.password:
continue
name = to_cli(arg.cli_name).upper()
if arg.multivalue:
name = '%s...' % name
if arg.required:
yield name
else:
yield '[%s]' % name
def __get_mcl(self):
"""
Returns the Max Command Length.
"""
if self.__mcl is None:
if self.__d is None:
return None
self.__mcl = max(len(k) for k in self.__d)
return self.__mcl
mcl = property(__get_mcl)
def isdone(self, name):
"""
Return True in method named ``name`` has already been called.
"""
return name in self.__done
def __contains__(self, key):
assert self.__d is not None, 'you must call finalize() first'
return key in self.__d
def __getitem__(self, key):
assert self.__d is not None, 'you must call finalize() first'
return self.__d[key]
class cli(backend.Executioner):
"""
Backend plugin for executing from command line interface.
@ -839,40 +531,17 @@ class cli(backend.Executioner):
return
(key, argv) = (argv[0], argv[1:])
cmd = self.get_command(key)
(kw, collector) = self.parse(cmd, argv)
if collector._interactive:
self.prompt_interactively(cmd, kw, collector)
kw = self.parse(cmd, argv)
if self.env.interactive:
self.prompt_interactively(cmd, kw)
self.create_context()
def prompt_interactively(self, cmd, kw, collector):
"""
Interactively prompt for missing or invalid values.
By default this method will only prompt for *required* Param that
have a missing or invalid value. However, if
``CLI.options.prompt_all`` is True, this method will prompt for any
params that have a missing or required values, even if the param is
optional.
"""
for param in cmd.params():
if param.password or param.autofill:
continue
elif param.name not in kw:
if not param.required and not collector._prompt_all:
continue
default = param.get_default(**kw)
error = None
while True:
if error is not None:
print '>>> %s: %s' % (param.cli_name, error)
raw = self.Backend.textui.prompt(param.cli_name, default)
try:
value = param(raw, **kw)
if value is not None:
kw[param.name] = value
break
except errors.ValidationError, e:
error = e.error
result = cmd(**kw)
if callable(cmd.output_for_cli):
for param in cmd.params():
if param.password and param.name in kw:
del kw[param.name]
(args, options) = cmd.params_2_args_options(**kw)
cmd.output_for_cli(self.api.Backend.textui, result, *args, **options)
def get_command(self, key):
name = from_cli(key)
@ -882,14 +551,13 @@ class cli(backend.Executioner):
def parse(self, cmd, argv):
parser = self.build_parser(cmd)
(collector, args) = parser.parse_args(argv,
Collector(_prompt_all=False, interactive=True)
)
(collector, args) = parser.parse_args(argv, Collector())
options = collector.__todict__()
kw = cmd.args_options_2_params(*args, **options)
return (dict(self.parse_iter(cmd, kw)), collector)
return dict(self.parse_iter(cmd, kw))
# FIXME: Move decoding to Command, use same regardless of request source
# FIXME: Probably move decoding to Command, use same method regardless of
# request source:
def parse_iter(self, cmd, kw):
"""
Decode param values if appropriate.
@ -905,17 +573,12 @@ class cli(backend.Executioner):
parser = optparse.OptionParser(
usage=' '.join(self.usage_iter(cmd))
)
if len(cmd.params) > 0:
parser.add_option('-a', dest='_prompt_all', action='store_true',
help='Prompt for all values interactively')
parser.add_option('-n', dest='_interactive', action='store_false',
help="Don\'t prompt for any values interactively")
for option in cmd.options():
kw = dict(
dest=option.name,
help=option.doc,
)
if option.password:
if option.password and self.env.interactive:
kw['action'] = 'store_true'
elif option.type is bool:
if option.default is True:
@ -941,31 +604,64 @@ class cli(backend.Executioner):
else:
yield '[%s]' % name
def prompt_interactively(self, cmd, kw):
"""
Interactively prompt for missing or invalid values.
By default this method will only prompt for *required* Param that
have a missing or invalid value. However, if
``self.env.prompt_all`` is ``True``, this method will prompt for any
params that have a missing values, even if the param is optional.
"""
for param in cmd.params():
if param.password:
if kw.get(param.name, False) is True or param.name in cmd.args:
kw[param.name] = \
self.Backend.textui.prompt_password(param.cli_name)
elif param.autofill or param.name in kw:
continue
elif param.required or self.env.prompt_all:
default = param.get_default(**kw)
error = None
while True:
if error is not None:
print '>>> %s: %s' % (param.cli_name, error)
raw = self.Backend.textui.prompt(param.cli_name, default)
try:
value = param(raw, **kw)
if value is not None:
kw[param.name] = value
break
except errors.ValidationError, e:
error = e.error
cli_plugins = (
cli,
textui,
console,
help,
)
def run(api):
error = None
try:
argv = api.bootstrap_with_global_options(context='cli')
for klass in cli_plugins:
api.register(klass)
api.load_plugins()
api.finalize()
api.Backend.cli.run(sys.argv[1:])
sys.exit()
api.Backend.cli.run(argv)
except KeyboardInterrupt:
print ''
api.log.info('operation aborted')
sys.exit()
except PublicError, e:
error = e
except Exception, e:
api.log.exception('%s: %s', e.__class__.__name__, str(e))
error = InternalError()
api.log.error(error.strerror)
sys.exit(error.errno)
if error is not None:
assert isinstance(error, PublicError)
api.log.error(error.strerror)
sys.exit(error.errno)

View File

@ -104,6 +104,10 @@ DEFAULT_CONFIG = (
('ca_port', 9180),
('ca_ssl_port', 9443),
# Special CLI:
('prompt_all', False),
('interactive', True),
# ********************************************************
# The remaining keys are never set from the values here!
# ********************************************************

View File

@ -576,7 +576,7 @@ class API(DictProxy):
handler.setLevel(logging.INFO)
log.addHandler(handler)
def add_global_options(self, parser=None, context=None):
def build_global_parser(self, parser=None, context=None):
"""
Add global options to an optparse.OptionParser instance.
"""
@ -597,15 +597,16 @@ class API(DictProxy):
)
if context == 'cli':
parser.add_option('-a', '--prompt-all', action='store_true',
help='Prompt for all values interactively'
help='Prompt for ALL values (even if optional)'
)
parser.add_option('-n', '--no-prompt', action='store_false',
help="Don\'t prompt for values interactively"
dest='interactive',
help='Prompt for NO values (even if required)'
)
return parser
def bootstrap_with_global_options(self, parser=None, context=None):
parser = self.add_global_options(parser, context)
parser = self.build_global_parser(parser, context)
(options, args) = parser.parse_args()
overrides = {}
if options.env is not None:
@ -619,7 +620,7 @@ class API(DictProxy):
# --Jason, 2008-10-31
pass
overrides[str(key.strip())] = value.strip()
for key in ('conf', 'debug', 'verbose'):
for key in ('conf', 'debug', 'verbose', 'prompt_all', 'interactive'):
value = getattr(options, key, None)
if value is not None:
overrides[key] = value

View File

@ -30,12 +30,13 @@ class passwd(Command):
'Edit existing password policy.'
takes_args = (
Str('principal',
Password('password'),
Str('principal?',
cli_name='user',
primary_key=True,
autofill=True,
default_from=util.get_current_principal,
),
Password('password'),
)
def execute(self, principal, password):

View File

@ -114,164 +114,3 @@ from_default_conf = set in default.conf
# Make sure cli.conf is loaded first:
from_cli_conf = overridden in default.conf
"""
class test_CLI(ClassChecker):
"""
Test the `ipalib.cli.CLI` class.
"""
_cls = cli.CLI
def new(self, argv=tuple()):
(api, home) = get_api()
o = self.cls(api, argv)
assert o.api is api
return (o, api, home)
def check_cascade(self, *names):
(o, api, home) = self.new()
method = getattr(o, names[0])
for name in names:
assert o.isdone(name) is False
method()
for name in names:
assert o.isdone(name) is True
e = raises(StandardError, method)
assert str(e) == 'CLI.%s() already called' % names[0]
def test_init(self):
"""
Test the `ipalib.cli.CLI.__init__` method.
"""
argv = ['-v', 'user-add', '--first=Jonh', '--last=Doe']
(o, api, home) = self.new(argv)
assert o.api is api
assert o.argv == tuple(argv)
def test_run_real(self):
"""
Test the `ipalib.cli.CLI.run_real` method.
"""
self.check_cascade(
'run_real',
'finalize',
'load_plugins',
'bootstrap',
'parse_globals'
)
def test_finalize(self):
"""
Test the `ipalib.cli.CLI.finalize` method.
"""
self.check_cascade(
'finalize',
'load_plugins',
'bootstrap',
'parse_globals'
)
(o, api, home) = self.new()
assert api.isdone('finalize') is False
assert 'Command' not in api
o.finalize()
assert api.isdone('finalize') is True
assert list(api.Command) == \
sorted(k.__name__ for k in cli.cli_application_commands)
def test_load_plugins(self):
"""
Test the `ipalib.cli.CLI.load_plugins` method.
"""
self.check_cascade(
'load_plugins',
'bootstrap',
'parse_globals'
)
(o, api, home) = self.new()
assert api.isdone('load_plugins') is False
o.load_plugins()
assert api.isdone('load_plugins') is True
def test_bootstrap(self):
"""
Test the `ipalib.cli.CLI.bootstrap` method.
"""
self.check_cascade(
'bootstrap',
'parse_globals'
)
# Test with empty argv
(o, api, home) = self.new()
keys = tuple(api.env)
assert api.isdone('bootstrap') is False
o.bootstrap()
assert api.isdone('bootstrap') is True
e = raises(StandardError, o.bootstrap)
assert str(e) == 'CLI.bootstrap() already called'
assert api.env.verbose is False
assert api.env.context == 'cli'
keys = tuple(api.env)
added = (
'my_key',
'from_default_conf',
'from_cli_conf'
)
for key in added:
assert key not in api.env
assert key not in keys
# Test with a populated argv
argv = ['-e', 'my_key=my_val,whatever=Hello']
(o, api, home) = self.new(argv)
home.write(config_default, '.ipa', 'default.conf')
home.write(config_cli, '.ipa', 'cli.conf')
o.bootstrap()
assert api.env.my_key == 'my_val,whatever=Hello'
assert api.env.from_default_conf == 'set in default.conf'
assert api.env.from_cli_conf == 'set in cli.conf'
assert list(api.env) == sorted(keys + added)
def test_parse_globals(self):
"""
Test the `ipalib.cli.CLI.parse_globals` method.
"""
# Test with empty argv:
(o, api, home) = self.new()
assert not hasattr(o, 'options')
assert not hasattr(o, 'cmd_argv')
assert o.isdone('parse_globals') is False
o.parse_globals()
assert o.isdone('parse_globals') is True
assert o.options.prompt_all is False
assert o.options.interactive is True
assert o.options.verbose is None
assert o.options.conf is None
assert o.options.env is None
assert o.cmd_argv == tuple()
e = raises(StandardError, o.parse_globals)
assert str(e) == 'CLI.parse_globals() already called'
# Test with a populated argv:
argv = ('-a', '-n', '-v', '-c', '/my/config.conf', '-e', 'my_key=my_val')
cmd_argv = ('user-add', '--first', 'John', '--last', 'Doe')
(o, api, home) = self.new(argv + cmd_argv)
assert not hasattr(o, 'options')
assert not hasattr(o, 'cmd_argv')
assert o.isdone('parse_globals') is False
o.parse_globals()
assert o.isdone('parse_globals') is True
assert o.options.prompt_all is True
assert o.options.interactive is False
assert o.options.verbose is True
assert o.options.conf == '/my/config.conf'
assert o.options.env == ['my_key=my_val']
assert o.cmd_argv == cmd_argv
e = raises(StandardError, o.parse_globals)
assert str(e) == 'CLI.parse_globals() already called'
# Test with multiple -e args:
argv = ('-e', 'key1=val1', '-e', 'key2=val2')
(o, api, home) = self.new(argv)
o.parse_globals()
assert o.options.env == ['key1=val1', 'key2=val2']

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()
@ -240,4 +239,3 @@ class test_Indirect(XMLRPC_test):
pass
else:
assert False

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()

View File

@ -25,7 +25,6 @@ import sys
from xmlrpc_test import XMLRPC_test
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()

View File

@ -26,7 +26,6 @@ import socket
import nose
from ipalib import api
from ipalib import errors
from ipalib.cli import CLI
try:
api.finalize()