Implement XML introspection

https://fedorahosted.org/freeipa/ticket/2937
This commit is contained in:
Petr Viktorin
2014-01-14 13:41:19 +01:00
parent 6a2b70946f
commit 6bdc75ea24
2 changed files with 140 additions and 9 deletions

View File

@@ -31,7 +31,7 @@ import urlparse
import time import time
import json import json
from ipalib import plugable, capabilities from ipalib import plugable, capabilities, errors
from ipalib.backend import Executioner from ipalib.backend import Executioner
from ipalib.errors import (PublicError, InternalError, CommandError, JSONError, from ipalib.errors import (PublicError, InternalError, CommandError, JSONError,
CCacheError, RefererError, InvalidSessionPassword, NotFound, ACIError, CCacheError, RefererError, InvalidSessionPassword, NotFound, ACIError,
@@ -290,6 +290,8 @@ class WSGIExecutioner(Executioner):
content_type = None content_type = None
key = '' key = ''
_system_commands = {}
def set_api(self, api): def set_api(self, api):
super(WSGIExecutioner, self).set_api(api) super(WSGIExecutioner, self).set_api(api)
if 'wsgi_dispatch' in self.api.Backend: if 'wsgi_dispatch' in self.api.Backend:
@@ -331,9 +333,12 @@ class WSGIExecutioner(Executioner):
(name, args, options, _id) = self.unmarshal(data) (name, args, options, _id) = self.unmarshal(data)
else: else:
(name, args, options, _id) = self.simple_unmarshal(environ) (name, args, options, _id) = self.simple_unmarshal(environ)
if name not in self.Command: if name in self._system_commands:
result = self._system_commands[name](self, *args, **options)
elif name not in self.Command:
raise CommandError(name=name) raise CommandError(name=name)
result = self.Command[name](*args, **options) else:
result = self.Command[name](*args, **options)
except PublicError, e: except PublicError, e:
error = e error = e
except StandardError, e: except StandardError, e:
@@ -650,16 +655,56 @@ class xmlserver(KerberosWSGIExecutioner):
key = '/xml' key = '/xml'
def listMethods(self, *params): def listMethods(self, *params):
return tuple(name.decode('UTF-8') for name in self.Command) """list methods for XML-RPC introspection"""
if params:
raise errors.ZeroArgumentError(name='system.listMethods')
return (tuple(unicode(name) for name in self.Command) +
tuple(unicode(name) for name in self._system_commands))
def _get_method_name(self, name, *params):
"""Get a method name for XML-RPC introspection commands"""
if not params:
raise errors.RequirementError(name='method name')
elif len(params) > 1:
raise errors.MaxArgumentError(name=name, count=1)
[method_name] = params
return method_name
def methodSignature(self, *params): def methodSignature(self, *params):
return u'methodSignature not implemented' """get method signature for XML-RPC introspection"""
method_name = self._get_method_name('system.methodSignature', *params)
if method_name in self._system_commands:
# TODO
# for now let's not go out of our way to document standard XML-RPC
return u'undef'
elif method_name in self.Command:
# All IPA commands return a dict (struct),
# and take a params, options - list and dict (array, struct)
return [[u'struct', u'array', u'struct']]
else:
raise errors.CommandError(name=method_name)
def methodHelp(self, *params): def methodHelp(self, *params):
return u'methodHelp not implemented' """get method docstring for XML-RPC introspection"""
method_name = self._get_method_name('system.methodHelp', *params)
if method_name in self._system_commands:
return u''
elif method_name in self.Command:
return unicode(self.Command[method_name].__doc__ or '')
else:
raise errors.CommandError(name=method_name)
_system_commands = {
'system.listMethods': listMethods,
'system.methodSignature': methodSignature,
'system.methodHelp': methodHelp,
}
def unmarshal(self, data): def unmarshal(self, data):
(params, name) = xml_loads(data) (params, name) = xml_loads(data)
if name in self._system_commands:
# For XML-RPC introspection, return params directly
return (name, params, {}, None)
(args, options) = params_2_args_options(params) (args, options) = params_2_args_options(params)
if 'version' not in options: if 'version' not in options:
# Keep backwards compatibility with client containing # Keep backwards compatibility with client containing

View File

@@ -21,13 +21,14 @@
Test the `ipalib.rpc` module. Test the `ipalib.rpc` module.
""" """
import threading from xmlrpclib import Binary, Fault, dumps, loads
from xmlrpclib import Binary, Fault, dumps, loads, ServerProxy
import nose
from ipatests.util import raises, assert_equal, PluginTester, DummyClass from ipatests.util import raises, assert_equal, PluginTester, DummyClass
from ipatests.data import binary_bytes, utf8_bytes, unicode_str from ipatests.data import binary_bytes, utf8_bytes, unicode_str
from ipalib.frontend import Command from ipalib.frontend import Command
from ipalib.request import context, Connection from ipalib.request import context, Connection
from ipalib import rpc, errors from ipalib import rpc, errors, api, request
std_compound = (binary_bytes, utf8_bytes, unicode_str) std_compound = (binary_bytes, utf8_bytes, unicode_str)
@@ -242,3 +243,88 @@ class test_xmlclient(PluginTester):
assert_equal(e.error, u'no such error') assert_equal(e.error, u'no such error')
assert context.xmlclient.conn._calledall() is True assert context.xmlclient.conn._calledall() is True
class test_xml_introspection(object):
@classmethod
def setUpClass(self):
try:
api.Backend.xmlclient.connect(fallback=False)
except (errors.NetworkError, IOError):
raise nose.SkipTest('%r: Server not available: %r' %
(__name__, api.env.xmlrpc_uri))
@classmethod
def tearDownClass(self):
request.destroy_context()
def test_list_methods(self):
result = api.Backend.xmlclient.conn.system.listMethods()
assert len(result)
assert 'ping' in result
assert 'user_add' in result
assert 'system.listMethods' in result
assert 'system.methodSignature' in result
assert 'system.methodHelp' in result
def test_list_methods_many_params(self):
try:
result = api.Backend.xmlclient.conn.system.listMethods('foo')
except Fault, f:
print f
assert f.faultCode == 3003
assert f.faultString == (
"command 'system.listMethods' takes no arguments")
else:
raise AssertionError('did not raise')
def test_ping_signature(self):
result = api.Backend.xmlclient.conn.system.methodSignature('ping')
assert result == [['struct', 'array', 'struct']]
def test_ping_help(self):
result = api.Backend.xmlclient.conn.system.methodHelp('ping')
assert result == 'Ping a remote server.'
def test_signature_no_params(self):
try:
result = api.Backend.xmlclient.conn.system.methodSignature()
except Fault, f:
print f
assert f.faultCode == 3007
assert f.faultString == "'method name' is required"
else:
raise AssertionError('did not raise')
def test_signature_many_params(self):
try:
result = api.Backend.xmlclient.conn.system.methodSignature('a', 'b')
except Fault, f:
print f
assert f.faultCode == 3004
assert f.faultString == (
"command 'system.methodSignature' takes at most 1 argument")
else:
raise AssertionError('did not raise')
def test_help_no_params(self):
try:
result = api.Backend.xmlclient.conn.system.methodHelp()
except Fault, f:
print f
assert f.faultCode == 3007
assert f.faultString == "'method name' is required"
else:
raise AssertionError('did not raise')
def test_help_many_params(self):
try:
result = api.Backend.xmlclient.conn.system.methodHelp('a', 'b')
except Fault, f:
print f
assert f.faultCode == 3004
assert f.faultString == (
"command 'system.methodHelp' takes at most 1 argument")
else:
raise AssertionError('did not raise')