2008-11-24 13:51:03 -06:00
|
|
|
# Authors:
|
|
|
|
# Jason Gerard DeRose <jderose@redhat.com>
|
|
|
|
#
|
|
|
|
# Copyright (C) 2008 Red Hat
|
|
|
|
# see file 'COPYING' for use and warranty information
|
|
|
|
#
|
2010-12-09 06:59:11 -06:00
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
2008-11-24 13:51:03 -06:00
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
2010-12-09 06:59:11 -06:00
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
2008-11-24 13:51:03 -06:00
|
|
|
|
|
|
|
"""
|
2009-01-04 19:39:39 -06:00
|
|
|
Test the `ipaserver.rpc` module.
|
2008-11-24 13:51:03 -06:00
|
|
|
"""
|
|
|
|
|
2013-01-08 09:11:05 -06:00
|
|
|
import json
|
2015-04-24 07:39:48 -05:00
|
|
|
import pytest
|
2013-01-08 09:11:05 -06:00
|
|
|
|
2015-09-11 06:43:28 -05:00
|
|
|
import six
|
|
|
|
|
2015-12-16 09:06:03 -06:00
|
|
|
from ipatests.util import assert_equal, raises, PluginTester
|
|
|
|
from ipalib import errors
|
2009-01-16 01:00:15 -06:00
|
|
|
from ipaserver import rpcserver
|
2008-11-24 13:51:03 -06:00
|
|
|
|
2015-09-11 06:43:28 -05:00
|
|
|
if six.PY3:
|
|
|
|
unicode = str
|
|
|
|
|
2015-04-24 07:39:48 -05:00
|
|
|
pytestmark = pytest.mark.tier0
|
2008-11-24 13:51:03 -06:00
|
|
|
|
2010-02-23 11:53:47 -06:00
|
|
|
class StartResponse(object):
|
|
|
|
def __init__(self):
|
|
|
|
self.reset()
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
self.status = None
|
|
|
|
self.headers = None
|
|
|
|
|
|
|
|
def __call__(self, status, headers):
|
|
|
|
assert self.status is None
|
|
|
|
assert self.headers is None
|
|
|
|
assert isinstance(status, str)
|
|
|
|
assert isinstance(headers, list)
|
|
|
|
self.status = status
|
|
|
|
self.headers = headers
|
|
|
|
|
|
|
|
|
|
|
|
def test_not_found():
|
2015-06-22 05:58:43 -05:00
|
|
|
api = 'the api instance'
|
|
|
|
f = rpcserver.HTTP_Status(api)
|
2010-02-23 11:53:47 -06:00
|
|
|
t = rpcserver._not_found_template
|
|
|
|
s = StartResponse()
|
|
|
|
|
|
|
|
# Test with an innocent URL:
|
2012-02-29 15:12:58 -06:00
|
|
|
url = '/ipa/foo/stuff'
|
2010-02-23 11:53:47 -06:00
|
|
|
assert_equal(
|
2012-02-29 15:12:58 -06:00
|
|
|
f.not_found(None, s, url, None),
|
2010-02-23 11:53:47 -06:00
|
|
|
[t % dict(url='/ipa/foo/stuff')]
|
|
|
|
)
|
|
|
|
assert s.status == '404 Not Found'
|
2012-02-29 15:12:58 -06:00
|
|
|
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
2010-02-23 11:53:47 -06:00
|
|
|
|
|
|
|
# Test when URL contains any of '<>&'
|
|
|
|
s.reset()
|
2012-02-29 15:12:58 -06:00
|
|
|
url =' ' + '<script>do_bad_stuff();</script>'
|
2010-02-23 11:53:47 -06:00
|
|
|
assert_equal(
|
2012-02-29 15:12:58 -06:00
|
|
|
f.not_found(None, s, url, None),
|
2010-02-23 11:53:47 -06:00
|
|
|
[t % dict(url='&nbsp;<script>do_bad_stuff();</script>')]
|
|
|
|
)
|
|
|
|
assert s.status == '404 Not Found'
|
2012-02-29 15:12:58 -06:00
|
|
|
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
|
|
|
|
|
|
|
|
|
|
|
def test_bad_request():
|
2015-06-22 05:58:43 -05:00
|
|
|
api = 'the api instance'
|
|
|
|
f = rpcserver.HTTP_Status(api)
|
2012-02-29 15:12:58 -06:00
|
|
|
t = rpcserver._bad_request_template
|
|
|
|
s = StartResponse()
|
|
|
|
|
|
|
|
assert_equal(
|
|
|
|
f.bad_request(None, s, 'illegal request'),
|
|
|
|
[t % dict(message='illegal request')]
|
|
|
|
)
|
|
|
|
assert s.status == '400 Bad Request'
|
|
|
|
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
|
|
|
|
|
|
|
|
|
|
|
def test_internal_error():
|
2015-06-22 05:58:43 -05:00
|
|
|
api = 'the api instance'
|
|
|
|
f = rpcserver.HTTP_Status(api)
|
2012-02-29 15:12:58 -06:00
|
|
|
t = rpcserver._internal_error_template
|
|
|
|
s = StartResponse()
|
|
|
|
|
|
|
|
assert_equal(
|
|
|
|
f.internal_error(None, s, 'request failed'),
|
|
|
|
[t % dict(message='request failed')]
|
|
|
|
)
|
|
|
|
assert s.status == '500 Internal Server Error'
|
|
|
|
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
|
|
|
|
|
|
|
|
2012-03-09 10:48:32 -06:00
|
|
|
def test_unauthorized_error():
|
2015-06-22 05:58:43 -05:00
|
|
|
api = 'the api instance'
|
|
|
|
f = rpcserver.HTTP_Status(api)
|
2012-02-29 15:12:58 -06:00
|
|
|
t = rpcserver._unauthorized_template
|
|
|
|
s = StartResponse()
|
|
|
|
|
|
|
|
assert_equal(
|
2012-04-13 14:19:32 -05:00
|
|
|
f.unauthorized(None, s, 'unauthorized', 'password-expired'),
|
2012-02-29 15:12:58 -06:00
|
|
|
[t % dict(message='unauthorized')]
|
|
|
|
)
|
|
|
|
assert s.status == '401 Unauthorized'
|
2012-04-13 14:19:32 -05:00
|
|
|
assert s.headers == [('Content-Type', 'text/html; charset=utf-8'),
|
|
|
|
('X-IPA-Rejection-Reason', 'password-expired')]
|
2010-02-23 11:53:47 -06:00
|
|
|
|
|
|
|
|
2008-11-25 12:54:51 -06:00
|
|
|
def test_params_2_args_options():
|
|
|
|
"""
|
2009-01-16 01:00:15 -06:00
|
|
|
Test the `ipaserver.rpcserver.params_2_args_options` function.
|
2008-11-25 12:54:51 -06:00
|
|
|
"""
|
2009-01-16 01:00:15 -06:00
|
|
|
f = rpcserver.params_2_args_options
|
2008-11-25 12:54:51 -06:00
|
|
|
args = ('Hello', u'world!')
|
|
|
|
options = dict(one=1, two=u'Two', three='Three')
|
|
|
|
assert f(tuple()) == (tuple(), dict())
|
2010-03-26 04:56:53 -05:00
|
|
|
assert f([args]) == (args, dict())
|
|
|
|
assert f([args, options]) == (args, options)
|
2008-11-25 12:54:51 -06:00
|
|
|
|
|
|
|
|
2010-02-23 11:53:47 -06:00
|
|
|
class test_session(object):
|
2012-02-15 09:26:42 -06:00
|
|
|
klass = rpcserver.wsgi_dispatch
|
2010-02-23 11:53:47 -06:00
|
|
|
|
|
|
|
def test_route(self):
|
|
|
|
def app1(environ, start_response):
|
|
|
|
return (
|
|
|
|
'from 1',
|
|
|
|
[environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
|
|
|
|
)
|
|
|
|
|
|
|
|
def app2(environ, start_response):
|
|
|
|
return (
|
|
|
|
'from 2',
|
|
|
|
[environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
|
|
|
|
)
|
|
|
|
|
2015-06-22 05:58:43 -05:00
|
|
|
api = 'the api instance'
|
|
|
|
inst = self.klass(api)
|
2012-02-25 12:39:19 -06:00
|
|
|
inst.mount(app1, '/foo/stuff')
|
|
|
|
inst.mount(app2, '/bar')
|
2010-02-23 11:53:47 -06:00
|
|
|
|
|
|
|
d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/foo/stuff')
|
2012-02-25 12:39:19 -06:00
|
|
|
assert inst.route(d, None) == ('from 1', ['/ipa', '/foo/stuff'])
|
2010-02-23 11:53:47 -06:00
|
|
|
|
|
|
|
d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/bar')
|
2012-02-25 12:39:19 -06:00
|
|
|
assert inst.route(d, None) == ('from 2', ['/ipa', '/bar'])
|
2010-02-23 11:53:47 -06:00
|
|
|
|
|
|
|
def test_mount(self):
|
|
|
|
def app1(environ, start_response):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def app2(environ, start_response):
|
|
|
|
pass
|
|
|
|
|
|
|
|
# Test that mount works:
|
2015-06-22 05:58:43 -05:00
|
|
|
api = 'the api instance'
|
|
|
|
inst = self.klass(api)
|
2010-02-23 11:53:47 -06:00
|
|
|
inst.mount(app1, 'foo')
|
|
|
|
assert inst['foo'] is app1
|
|
|
|
assert list(inst) == ['foo']
|
|
|
|
|
2015-08-24 05:40:33 -05:00
|
|
|
# Test that Exception is raise if trying override a mount:
|
|
|
|
e = raises(Exception, inst.mount, app2, 'foo')
|
2010-02-23 11:53:47 -06:00
|
|
|
assert str(e) == '%s.mount(): cannot replace %r with %r at %r' % (
|
2012-02-15 09:26:42 -06:00
|
|
|
'wsgi_dispatch', app1, app2, 'foo'
|
2010-02-23 11:53:47 -06:00
|
|
|
)
|
|
|
|
|
|
|
|
# Test mounting a second app:
|
|
|
|
inst.mount(app2, 'bar')
|
|
|
|
assert inst['bar'] is app2
|
|
|
|
assert list(inst) == ['bar', 'foo']
|
|
|
|
|
|
|
|
|
2009-01-16 02:47:03 -06:00
|
|
|
class test_xmlserver(PluginTester):
|
2008-11-24 22:34:01 -06:00
|
|
|
"""
|
2009-01-16 02:47:03 -06:00
|
|
|
Test the `ipaserver.rpcserver.xmlserver` plugin.
|
2008-11-24 22:34:01 -06:00
|
|
|
"""
|
2008-11-24 13:51:03 -06:00
|
|
|
|
2009-01-16 02:47:03 -06:00
|
|
|
_plugin = rpcserver.xmlserver
|
2008-11-24 22:34:01 -06:00
|
|
|
|
2012-02-15 09:26:42 -06:00
|
|
|
def test_marshaled_dispatch(self): # FIXME
|
2016-10-06 13:28:59 -05:00
|
|
|
self.instance('Backend', in_server=True)
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
|
|
|
|
class test_jsonserver(PluginTester):
|
|
|
|
"""
|
|
|
|
Test the `ipaserver.rpcserver.jsonserver` plugin.
|
|
|
|
"""
|
|
|
|
|
|
|
|
_plugin = rpcserver.jsonserver
|
|
|
|
|
|
|
|
def test_unmarshal(self):
|
|
|
|
"""
|
|
|
|
Test the `ipaserver.rpcserver.jsonserver.unmarshal` method.
|
|
|
|
"""
|
2016-10-06 13:28:59 -05:00
|
|
|
o, _api, _home = self.instance('Backend', in_server=True)
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test with invalid JSON-data:
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, 'this wont work')
|
2016-05-06 10:43:06 -05:00
|
|
|
if six.PY2:
|
|
|
|
assert unicode(e.error) == 'No JSON object could be decoded'
|
|
|
|
else:
|
|
|
|
assert str(e.error).startswith('Expecting value: ')
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test with non-dict type:
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps([1, 2, 3]))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'Request must be a dict'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
params = [[1, 2], dict(three=3, four=4)]
|
|
|
|
# Test with missing method:
|
|
|
|
d = dict(params=params, id=18)
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'Request is missing "method"'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test with missing params:
|
|
|
|
d = dict(method='echo', id=18)
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'Request is missing "params"'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test with non-list params:
|
|
|
|
for p in ('hello', dict(args=tuple(), options=dict())):
|
|
|
|
d = dict(method='echo', id=18, params=p)
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'params must be a list'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test with other than 2 params:
|
|
|
|
for p in ([], [tuple()], [None, dict(), tuple()]):
|
|
|
|
d = dict(method='echo', id=18, params=p)
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'params must contain [args, options]'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test when args is not a list:
|
|
|
|
d = dict(method='echo', id=18, params=['args', dict()])
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'params[0] (aka args) must be a list'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test when options is not a dict:
|
|
|
|
d = dict(method='echo', id=18, params=[('hello', 'world'), 'options'])
|
|
|
|
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
2012-07-04 07:52:47 -05:00
|
|
|
assert unicode(e.error) == 'params[1] (aka options) must be a dict'
|
2009-10-13 12:28:00 -05:00
|
|
|
|
|
|
|
# Test with valid values:
|
2012-12-19 03:25:24 -06:00
|
|
|
args = (u'jdoe', )
|
2009-10-13 12:28:00 -05:00
|
|
|
options = dict(givenname=u'John', sn='Doe')
|
2012-12-19 03:25:24 -06:00
|
|
|
d = dict(method=u'user_add', params=(args, options), id=18)
|
2009-10-13 12:28:00 -05:00
|
|
|
assert o.unmarshal(json.dumps(d)) == (u'user_add', args, options, 18)
|