Implemented more elegant way for entire plugin module to be conditionally skipped; updated cert.py and ra.py modules to use this

This commit is contained in:
Jason Gerard DeRose 2009-02-12 02:10:12 -07:00 committed by Rob Crittenden
parent e0fe732318
commit 4ab133c3cb
7 changed files with 140 additions and 82 deletions

View File

@ -878,6 +878,7 @@ from frontend import Object, Method, Property
from crud import Create, Retrieve, Update, Delete, Search from crud import Create, Retrieve, Update, Delete, Search
from parameters import DefaultFrom, Bool, Flag, Int, Float, Bytes, Str, Password from parameters import DefaultFrom, Bool, Flag, Int, Float, Bytes, Str, Password
from parameters import BytesEnum, StrEnum from parameters import BytesEnum, StrEnum
from errors2 import SkipPluginModule
try: try:
import uuid import uuid

View File

@ -64,7 +64,7 @@ FORMAT_FILE = '\t'.join([
'%(process)d', '%(process)d',
'%(threadName)s', '%(threadName)s',
'%(levelname)s', '%(levelname)s',
'%(message)r', # Using %r for repr() so message is a single line '%(message)s',
]) ])

View File

@ -208,6 +208,21 @@ class PluginMissingOverrideError(PrivateError):
format = '%(base)s.%(name)s not registered, cannot override with %(plugin)r' format = '%(base)s.%(name)s not registered, cannot override with %(plugin)r'
class SkipPluginModule(PrivateError):
"""
Raised to abort the loading of a plugin module.
"""
format = '%(reason)s'
class PluginsPackageError(PrivateError):
"""
Raised when ``package.plugins`` is a module instead of a sub-package.
"""
format = '%(name)s must be sub-package, not module: %(file)r'
############################################################################## ##############################################################################
# Public errors: # Public errors:

View File

@ -646,9 +646,43 @@ class API(DictProxy):
self.__do_if_not_done('bootstrap') self.__do_if_not_done('bootstrap')
if self.env.mode in ('dummy', 'unit_test'): if self.env.mode in ('dummy', 'unit_test'):
return return
util.import_plugins_subpackage('ipalib') self.import_plugins('ipalib')
if self.env.in_server: if self.env.in_server:
util.import_plugins_subpackage('ipaserver') self.import_plugins('ipaserver')
# FIXME: This method has no unit test
def import_plugins(self, package):
"""
Import modules in ``plugins`` sub-package of ``package``.
"""
subpackage = '%s.plugins' % package
try:
parent = __import__(package)
plugins = __import__(subpackage).plugins
except ImportError, e:
self.log.error(
'cannot import plugins sub-package %s: %s', subpackage, e
)
raise e
parent_dir = path.dirname(path.abspath(parent.__file__))
plugins_dir = path.dirname(path.abspath(plugins.__file__))
if parent_dir == plugins_dir:
raise errors2.PluginsPackageError(
name=subpackage, file=plugins.__file__
)
self.log.debug('importing all plugin modules in %r...', plugins_dir)
for (name, pyfile) in util.find_modules_in_dir(plugins_dir):
fullname = '%s.%s' % (subpackage, name)
self.log.debug('importing plugin module %r', pyfile)
try:
__import__(fullname)
except errors2.SkipPluginModule, e:
self.log.info(
'skipping plugin module %s: %s', fullname, e.reason
)
except StandardError, e:
self.log.error('could not load plugin module %r', pyfile)
raise e
def finalize(self): def finalize(self):
""" """

View File

@ -22,110 +22,114 @@
Command plugins for IPA-RA certificate operations. Command plugins for IPA-RA certificate operations.
""" """
from ipalib import api from ipalib import api, SkipPluginModule
if api.env.enable_ra: if api.env.enable_ra is not True:
from ipalib import Command, Str, Int raise SkipPluginModule(reason='env.enable_ra=%r' % (api.env.enable_ra,))
class cert_request(Command): from ipalib import Command, Str, Int
"""
Submit a certificate singing request.
"""
takes_args = ('csr',) assert False
takes_options = ( class cert_request(Command):
Str('request_type', default=u'pkcs10', autofill=True), """
) Submit a certificate singing request.
"""
def execute(self, csr, **options): takes_args = ('csr',)
return self.Backend.ra.request_certificate(csr, **options)
def output_for_cli(self, textui, result, *args, **options): takes_options = (
if isinstance(result, dict) and len(result) > 0: Str('request_type', default=u'pkcs10', autofill=True),
textui.print_entry(result, 0) )
else:
textui.print_plain('Failed to submit a certificate request.')
api.register(cert_request) def execute(self, csr, **options):
return self.Backend.ra.request_certificate(csr, **options)
def output_for_cli(self, textui, result, *args, **options):
if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0)
else:
textui.print_plain('Failed to submit a certificate request.')
api.register(cert_request)
class cert_status(Command): class cert_status(Command):
""" """
Check status of a certificate signing request. Check status of a certificate signing request.
""" """
takes_args = ['request_id'] takes_args = ['request_id']
def execute(self, request_id, **options): def execute(self, request_id, **options):
return self.Backend.ra.check_request_status(request_id) return self.Backend.ra.check_request_status(request_id)
def output_for_cli(self, textui, result, *args, **options): def output_for_cli(self, textui, result, *args, **options):
if isinstance(result, dict) and len(result) > 0: if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0) textui.print_entry(result, 0)
else: else:
textui.print_plain('Failed to retrieve a request status.') textui.print_plain('Failed to retrieve a request status.')
api.register(cert_status) api.register(cert_status)
class cert_get(Command): class cert_get(Command):
""" """
Retrieve an existing certificate. Retrieve an existing certificate.
""" """
takes_args = ['serial_number'] takes_args = ['serial_number']
def execute(self, serial_number): def execute(self, serial_number):
return self.Backend.ra.get_certificate(serial_number) return self.Backend.ra.get_certificate(serial_number)
def output_for_cli(self, textui, result, *args, **options): def output_for_cli(self, textui, result, *args, **options):
if isinstance(result, dict) and len(result) > 0: if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0) textui.print_entry(result, 0)
else: else:
textui.print_plain('Failed to obtain a certificate.') textui.print_plain('Failed to obtain a certificate.')
api.register(cert_get) api.register(cert_get)
class cert_revoke(Command): class cert_revoke(Command):
""" """
Revoke a certificate. Revoke a certificate.
""" """
takes_args = ['serial_number'] takes_args = ['serial_number']
# FIXME: The default is 0. Is this really an Int param? # FIXME: The default is 0. Is this really an Int param?
takes_options = [Int('revocation_reason?', default=0)] takes_options = [Int('revocation_reason?', default=0)]
def execute(self, serial_number, **options): def execute(self, serial_number, **options):
return self.Backend.ra.revoke_certificate(serial_number, **options) return self.Backend.ra.revoke_certificate(serial_number, **options)
def output_for_cli(self, textui, result, *args, **options): def output_for_cli(self, textui, result, *args, **options):
if isinstance(result, dict) and len(result) > 0: if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0) textui.print_entry(result, 0)
else: else:
textui.print_plain('Failed to revoke a certificate.') textui.print_plain('Failed to revoke a certificate.')
api.register(cert_revoke) api.register(cert_revoke)
class cert_remove_hold(Command): class cert_remove_hold(Command):
""" """
Take a revoked certificate off hold. Take a revoked certificate off hold.
""" """
takes_args = ['serial_number'] takes_args = ['serial_number']
def execute(self, serial_number, **options): def execute(self, serial_number, **options):
return self.Backend.ra.take_certificate_off_hold(serial_number) return self.Backend.ra.take_certificate_off_hold(serial_number)
def output_for_cli(self, textui, result, *args, **options): def output_for_cli(self, textui, result, *args, **options):
if isinstance(result, dict) and len(result) > 0: if isinstance(result, dict) and len(result) > 0:
textui.print_entry(result, 0) textui.print_entry(result, 0)
else: else:
textui.print_plain('Failed to take a revoked certificate off hold.') textui.print_plain('Failed to take a revoked certificate off hold.')
api.register(cert_remove_hold) api.register(cert_remove_hold)

View File

@ -54,13 +54,13 @@ def find_modules_in_dir(src_dir):
for name in sorted(os.listdir(src_dir)): for name in sorted(os.listdir(src_dir)):
if not name.endswith(suffix): if not name.endswith(suffix):
continue continue
py_file = path.join(src_dir, name) pyfile = path.join(src_dir, name)
if path.islink(py_file) or not path.isfile(py_file): if path.islink(pyfile) or not path.isfile(pyfile):
continue continue
module = name[:-len(suffix)] module = name[:-len(suffix)]
if module == '__init__': if module == '__init__':
continue continue
yield module yield (module, pyfile)
# FIXME: This function has no unit test # FIXME: This function has no unit test

View File

@ -31,6 +31,11 @@ certificates via the following methods:
* `ra.take_certificate_off_hold()` - take a certificate off hold. * `ra.take_certificate_off_hold()` - take a certificate off hold.
""" """
from ipalib import api, SkipPluginModule
if api.env.enable_ra is not True:
raise SkipPluginModule(reason='env.enable_ra=%r' % (api.env.enable_ra,))
import os, stat, subprocess import os, stat, subprocess
import array import array
import errno import errno
@ -40,7 +45,7 @@ from urllib import urlencode, quote
from socket import gethostname from socket import gethostname
import socket import socket
from ipalib import api, Backend from ipalib import Backend
from ipalib.errors2 import NetworkError from ipalib.errors2 import NetworkError
from ipaserver import servercore from ipaserver import servercore
from ipaserver import ipaldap from ipaserver import ipaldap
@ -418,5 +423,4 @@ class ra(Backend):
# self.debug("IPA-RA: stderr: '%s'" % stderr) # self.debug("IPA-RA: stderr: '%s'" % stderr)
return (p.returncode, stdout, stderr) return (p.returncode, stdout, stderr)
if api.env.enable_ra: api.register(ra)
api.register(ra)