mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Make an ipa-tests package
Rename the 'tests' directory to 'ipa-tests', and create an ipa-tests RPM containing the test suite Part of the work for: https://fedorahosted.org/freeipa/ticket/3654
This commit is contained in:
committed by
Martin Kosek
parent
6d66e826c1
commit
c60142efda
22
ipatests/test_ipaserver/__init__.py
Normal file
22
ipatests/test_ipaserver/__init__.py
Normal file
@@ -0,0 +1,22 @@
|
||||
# Authors:
|
||||
# Jason Gerard DeRose <jderose@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2008 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Sub-package containing unit tests for `ipaserver` package.
|
||||
"""
|
||||
52
ipatests/test_ipaserver/httptest.py
Normal file
52
ipatests/test_ipaserver/httptest.py
Normal file
@@ -0,0 +1,52 @@
|
||||
# Authors:
|
||||
# Martin Kosek <mkosek@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2012 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
Base class for HTTP request tests
|
||||
"""
|
||||
|
||||
import urllib
|
||||
import httplib
|
||||
|
||||
from ipalib import api
|
||||
|
||||
class Unauthorized_HTTP_test(object):
|
||||
"""
|
||||
Base class for simple HTTP request tests executed against URI
|
||||
with no required authorization
|
||||
"""
|
||||
app_uri = ''
|
||||
host = api.env.host
|
||||
content_type = 'application/x-www-form-urlencoded'
|
||||
|
||||
def send_request(self, method='POST', params=None):
|
||||
"""
|
||||
Send a request to HTTP server
|
||||
|
||||
:param key When not None, overrides default app_uri
|
||||
"""
|
||||
if params is not None:
|
||||
params = urllib.urlencode(params, True)
|
||||
url = 'https://' + self.host + self.app_uri
|
||||
|
||||
headers = {'Content-Type' : self.content_type,
|
||||
'Referer' : url}
|
||||
|
||||
conn = httplib.HTTPSConnection(self.host)
|
||||
conn.request(method, self.app_uri, params, headers)
|
||||
return conn.getresponse()
|
||||
59
ipatests/test_ipaserver/install/test_adtrustinstance.py
Executable file
59
ipatests/test_ipaserver/install/test_adtrustinstance.py
Executable file
@@ -0,0 +1,59 @@
|
||||
# Authors:
|
||||
# Sumit Bose <sbose@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2011 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
Test `adtrustinstance`
|
||||
"""
|
||||
|
||||
import os
|
||||
import nose
|
||||
|
||||
from ipaserver.install import adtrustinstance
|
||||
|
||||
class test_adtrustinstance:
|
||||
"""
|
||||
Test `adtrustinstance`.
|
||||
"""
|
||||
|
||||
def test_make_netbios_name(self):
|
||||
s = adtrustinstance.make_netbios_name("ABCDEF")
|
||||
assert s == 'ABCDEF' and isinstance(s, str)
|
||||
s = adtrustinstance.make_netbios_name(U"ABCDEF")
|
||||
assert s == 'ABCDEF' and isinstance(s, unicode)
|
||||
s = adtrustinstance.make_netbios_name("abcdef")
|
||||
assert s == 'ABCDEF'
|
||||
s = adtrustinstance.make_netbios_name("abc.def")
|
||||
assert s == 'ABC'
|
||||
s = adtrustinstance.make_netbios_name("abcdefghijklmnopqr.def")
|
||||
assert s == 'ABCDEFGHIJKLMNO'
|
||||
s = adtrustinstance.make_netbios_name("A!$%B&/()C=?+*D")
|
||||
assert s == 'ABCD'
|
||||
s = adtrustinstance.make_netbios_name("!$%&/()=?+*")
|
||||
assert not s
|
||||
|
||||
def test_check_netbios_name(self):
|
||||
assert adtrustinstance.check_netbios_name("ABCDEF")
|
||||
assert not adtrustinstance.check_netbios_name("abcdef")
|
||||
assert adtrustinstance.check_netbios_name("ABCDE12345ABCDE")
|
||||
assert not adtrustinstance.check_netbios_name("ABCDE12345ABCDE1")
|
||||
assert not adtrustinstance.check_netbios_name("")
|
||||
|
||||
assert adtrustinstance.check_netbios_name(U"ABCDEF")
|
||||
assert not adtrustinstance.check_netbios_name(U"abcdef")
|
||||
assert adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE")
|
||||
assert not adtrustinstance.check_netbios_name(U"ABCDE12345ABCDE1")
|
||||
107
ipatests/test_ipaserver/test_changepw.py
Normal file
107
ipatests/test_ipaserver/test_changepw.py
Normal file
@@ -0,0 +1,107 @@
|
||||
# Authors:
|
||||
# Martin Kosek <mkosek@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2012 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import nose
|
||||
|
||||
from httptest import Unauthorized_HTTP_test
|
||||
from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
|
||||
from ipatests.util import assert_equal, assert_not_equal
|
||||
from ipalib import api, errors
|
||||
from ipapython.dn import DN
|
||||
import ldap
|
||||
|
||||
testuser = u'tuser'
|
||||
old_password = u'old_password'
|
||||
new_password = u'new_password'
|
||||
|
||||
class test_changepw(XMLRPC_test, Unauthorized_HTTP_test):
|
||||
app_uri = '/ipa/session/change_password'
|
||||
|
||||
def setUp(self):
|
||||
super(test_changepw, self).setUp()
|
||||
try:
|
||||
api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User')
|
||||
api.Command['passwd'](testuser, password=u'old_password')
|
||||
except errors.ExecutionError, e:
|
||||
raise nose.SkipTest(
|
||||
'Cannot set up test user: %s' % e
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
api.Command['user_del']([testuser])
|
||||
except errors.NotFound:
|
||||
pass
|
||||
super(test_changepw, self).tearDown()
|
||||
|
||||
def _changepw(self, user, old_password, new_password):
|
||||
return self.send_request(params={'user': str(user),
|
||||
'old_password' : str(old_password),
|
||||
'new_password' : str(new_password)},
|
||||
)
|
||||
|
||||
def _checkpw(self, user, password):
|
||||
dn = str(DN(('uid', user), api.env.container_user, api.env.basedn))
|
||||
conn = ldap.initialize(api.env.ldap_uri)
|
||||
try:
|
||||
conn.simple_bind_s(dn, password)
|
||||
finally:
|
||||
conn.unbind_s()
|
||||
|
||||
def test_bad_options(self):
|
||||
for params in (None, # no params
|
||||
{'user': 'foo'}, # missing options
|
||||
{'user': 'foo',
|
||||
'old_password' : 'old'}, # missing option
|
||||
{'user': 'foo',
|
||||
'old_password' : 'old',
|
||||
'new_password' : ''}, # empty option
|
||||
):
|
||||
response = self.send_request(params=params)
|
||||
assert_equal(response.status, 400)
|
||||
assert_equal(response.reason, 'Bad Request')
|
||||
|
||||
def test_invalid_auth(self):
|
||||
response = self._changepw(testuser, 'wrongpassword', 'new_password')
|
||||
|
||||
assert_equal(response.status, 200)
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password')
|
||||
|
||||
# make sure that password is NOT changed
|
||||
self._checkpw(testuser, old_password)
|
||||
|
||||
def test_pwpolicy_error(self):
|
||||
response = self._changepw(testuser, old_password, '1')
|
||||
|
||||
assert_equal(response.status, 200)
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error')
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'),
|
||||
'Constraint violation: Password is too short')
|
||||
|
||||
# make sure that password is NOT changed
|
||||
self._checkpw(testuser, old_password)
|
||||
|
||||
def test_pwpolicy_success(self):
|
||||
response = self._changepw(testuser, old_password, new_password)
|
||||
|
||||
assert_equal(response.status, 200)
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok')
|
||||
|
||||
# make sure that password IS changed
|
||||
self._checkpw(testuser, new_password)
|
||||
259
ipatests/test_ipaserver/test_ldap.py
Normal file
259
ipatests/test_ipaserver/test_ldap.py
Normal file
@@ -0,0 +1,259 @@
|
||||
# Authors:
|
||||
# Rob Crittenden <rcritten@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2010 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Test some simple LDAP requests using the ldap2 backend
|
||||
|
||||
# This fetches a certificate from a host principal so we can ensure that the
|
||||
# schema is working properly. We know this because the schema will tell the
|
||||
# encoder not to utf-8 encode binary attributes.
|
||||
|
||||
# The DM password needs to be set in ~/.ipa/.dmpw
|
||||
|
||||
import os
|
||||
|
||||
import nose
|
||||
from nose.tools import assert_raises # pylint: disable=E0611
|
||||
import nss.nss as nss
|
||||
|
||||
from ipaserver.plugins.ldap2 import ldap2
|
||||
from ipalib.plugins.service import service, service_show
|
||||
from ipalib.plugins.host import host
|
||||
from ipalib import api, x509, create_api, errors
|
||||
from ipapython import ipautil
|
||||
from ipapython.dn import DN
|
||||
|
||||
class test_ldap(object):
|
||||
"""
|
||||
Test various LDAP client bind methods.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.conn = None
|
||||
self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host)
|
||||
self.ccache = '/tmp/krb5cc_%d' % os.getuid()
|
||||
nss.nss_init_nodb()
|
||||
self.dn = DN(('krbprincipalname','ldap/%s@%s' % (api.env.host, api.env.realm)),
|
||||
('cn','services'),('cn','accounts'),api.env.basedn)
|
||||
|
||||
def tearDown(self):
|
||||
if self.conn and self.conn.isconnected():
|
||||
self.conn.disconnect()
|
||||
|
||||
def test_anonymous(self):
|
||||
"""
|
||||
Test an anonymous LDAP bind using ldap2
|
||||
"""
|
||||
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
||||
self.conn.connect()
|
||||
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
cert = cert[0]
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
assert serial is not None
|
||||
|
||||
def test_GSSAPI(self):
|
||||
"""
|
||||
Test a GSSAPI LDAP bind using ldap2
|
||||
"""
|
||||
if not ipautil.file_exists(self.ccache):
|
||||
raise nose.SkipTest('Missing ccache %s' % self.ccache)
|
||||
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
||||
self.conn.connect(ccache='FILE:%s' % self.ccache)
|
||||
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
cert = cert[0]
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
assert serial is not None
|
||||
|
||||
def test_simple(self):
|
||||
"""
|
||||
Test a simple LDAP bind using ldap2
|
||||
"""
|
||||
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
|
||||
if ipautil.file_exists(pwfile):
|
||||
fp = open(pwfile, "r")
|
||||
dm_password = fp.read().rstrip()
|
||||
fp.close()
|
||||
else:
|
||||
raise nose.SkipTest("No directory manager password in %s" % pwfile)
|
||||
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
||||
self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password)
|
||||
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
cert = cert[0]
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
assert serial is not None
|
||||
|
||||
def test_Backend(self):
|
||||
"""
|
||||
Test using the ldap2 Backend directly (ala ipa-server-install)
|
||||
"""
|
||||
|
||||
# Create our own api because the one generated for the tests is
|
||||
# a client-only api. Then we register in the commands and objects
|
||||
# we need for the test.
|
||||
myapi = create_api(mode=None)
|
||||
myapi.bootstrap(context='cli', in_server=True, in_tree=True)
|
||||
myapi.register(ldap2)
|
||||
myapi.register(host)
|
||||
myapi.register(service)
|
||||
myapi.register(service_show)
|
||||
myapi.finalize()
|
||||
|
||||
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
|
||||
if ipautil.file_exists(pwfile):
|
||||
fp = open(pwfile, "r")
|
||||
dm_password = fp.read().rstrip()
|
||||
fp.close()
|
||||
else:
|
||||
raise nose.SkipTest("No directory manager password in %s" % pwfile)
|
||||
myapi.Backend.ldap2.connect(bind_dn=DN(('cn', 'Directory Manager')), bind_pw=dm_password)
|
||||
|
||||
result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,))
|
||||
entry_attrs = result['result']
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
cert = cert[0]
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
assert serial is not None
|
||||
|
||||
def test_autobind(self):
|
||||
"""
|
||||
Test an autobind LDAP bind using ldap2
|
||||
"""
|
||||
ldapuri = 'ldapi://%%2fvar%%2frun%%2fslapd-%s.socket' % api.env.realm.replace('.','-')
|
||||
self.conn = ldap2(shared_instance=False, ldap_uri=ldapuri)
|
||||
try:
|
||||
self.conn.connect(autobind=True)
|
||||
except errors.ACIError:
|
||||
raise nose.SkipTest("Only executed as root")
|
||||
(dn, entry_attrs) = self.conn.get_entry(self.dn, ['usercertificate'])
|
||||
cert = entry_attrs.get('usercertificate')
|
||||
cert = cert[0]
|
||||
serial = unicode(x509.get_serial_number(cert, x509.DER))
|
||||
assert serial is not None
|
||||
|
||||
|
||||
class test_LDAPEntry(object):
|
||||
"""
|
||||
Test the LDAPEntry class
|
||||
"""
|
||||
cn1 = [u'test1']
|
||||
cn2 = [u'test2']
|
||||
dn1 = DN(('cn', cn1[0]))
|
||||
dn2 = DN(('cn', cn2[0]))
|
||||
|
||||
def setUp(self):
|
||||
self.ldapuri = 'ldap://%s' % ipautil.format_netloc(api.env.host)
|
||||
self.conn = ldap2(shared_instance=False, ldap_uri=self.ldapuri)
|
||||
self.conn.connect()
|
||||
|
||||
self.entry = self.conn.make_entry(self.dn1, cn=self.cn1)
|
||||
|
||||
def tearDown(self):
|
||||
if self.conn and self.conn.isconnected():
|
||||
self.conn.disconnect()
|
||||
|
||||
def test_entry(self):
|
||||
e = self.entry
|
||||
assert e.dn is self.dn1
|
||||
assert u'cn' in e
|
||||
assert u'cn' in e.keys()
|
||||
assert 'CN' in e
|
||||
assert 'CN' not in e.keys()
|
||||
assert 'commonName' in e
|
||||
assert 'commonName' not in e.keys()
|
||||
assert e['CN'] is self.cn1
|
||||
assert e['CN'] is e[u'cn']
|
||||
|
||||
e.dn = self.dn2
|
||||
assert e.dn is self.dn2
|
||||
|
||||
def test_set_attr(self):
|
||||
e = self.entry
|
||||
e['commonName'] = self.cn2
|
||||
assert u'cn' in e
|
||||
assert u'cn' not in e.keys()
|
||||
assert 'CN' in e
|
||||
assert 'CN' not in e.keys()
|
||||
assert 'commonName' in e
|
||||
assert 'commonName' in e.keys()
|
||||
assert e['CN'] is self.cn2
|
||||
assert e['CN'] is e[u'cn']
|
||||
|
||||
def test_del_attr(self):
|
||||
e = self.entry
|
||||
del e['CN']
|
||||
assert 'CN' not in e
|
||||
assert 'CN' not in e.keys()
|
||||
assert u'cn' not in e
|
||||
assert u'cn' not in e.keys()
|
||||
assert 'commonName' not in e
|
||||
assert 'commonName' not in e.keys()
|
||||
|
||||
def test_popitem(self):
|
||||
e = self.entry
|
||||
assert e.popitem() == ('cn', self.cn1)
|
||||
e.keys() == []
|
||||
|
||||
def test_setdefault(self):
|
||||
e = self.entry
|
||||
assert e.setdefault('cn', self.cn2) == self.cn1
|
||||
assert e['cn'] == self.cn1
|
||||
assert e.setdefault('xyz', self.cn2) == self.cn2
|
||||
assert e['xyz'] == self.cn2
|
||||
|
||||
def test_update(self):
|
||||
e = self.entry
|
||||
e.update({'cn': self.cn2}, xyz=self.cn2)
|
||||
assert e['cn'] == self.cn2
|
||||
assert e['xyz'] == self.cn2
|
||||
|
||||
def test_pop(self):
|
||||
e = self.entry
|
||||
assert e.pop('cn') == self.cn1
|
||||
assert 'cn' not in e
|
||||
assert e.pop('cn', 'default') is 'default'
|
||||
with assert_raises(KeyError):
|
||||
e.pop('cn')
|
||||
|
||||
def test_clear(self):
|
||||
e = self.entry
|
||||
e.clear()
|
||||
assert not e
|
||||
assert 'cn' not in e
|
||||
|
||||
def test_has_key(self):
|
||||
e = self.entry
|
||||
assert not e.has_key('xyz')
|
||||
assert e.has_key('cn')
|
||||
assert e.has_key('COMMONNAME')
|
||||
|
||||
def test_get(self):
|
||||
e = self.entry
|
||||
assert e.get('cn') == self.cn1
|
||||
assert e.get('commonname') == self.cn1
|
||||
assert e.get('COMMONNAME', 'default') == self.cn1
|
||||
assert e.get('bad key', 'default') == 'default'
|
||||
|
||||
def test_single_value(self):
|
||||
e = self.entry
|
||||
assert e.single_value('cn') == self.cn1[0]
|
||||
assert e.single_value('commonname') == self.cn1[0]
|
||||
assert e.single_value('COMMONNAME', 'default') == self.cn1[0]
|
||||
assert e.single_value('bad key', 'default') == 'default'
|
||||
247
ipatests/test_ipaserver/test_rpcserver.py
Normal file
247
ipatests/test_ipaserver/test_rpcserver.py
Normal file
@@ -0,0 +1,247 @@
|
||||
# Authors:
|
||||
# Jason Gerard DeRose <jderose@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2008 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Test the `ipaserver.rpc` module.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from ipatests.util import create_test_api, assert_equal, raises, PluginTester
|
||||
from ipatests.data import unicode_str
|
||||
from ipalib import errors, Command
|
||||
from ipaserver import rpcserver
|
||||
|
||||
|
||||
class StartResponse(object):
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.status = None
|
||||
self.headers = None
|
||||
|
||||
def __call__(self, status, headers):
|
||||
assert self.status is None
|
||||
assert self.headers is None
|
||||
assert isinstance(status, str)
|
||||
assert isinstance(headers, list)
|
||||
self.status = status
|
||||
self.headers = headers
|
||||
|
||||
|
||||
def test_not_found():
|
||||
f = rpcserver.HTTP_Status()
|
||||
t = rpcserver._not_found_template
|
||||
s = StartResponse()
|
||||
|
||||
# Test with an innocent URL:
|
||||
url = '/ipa/foo/stuff'
|
||||
assert_equal(
|
||||
f.not_found(None, s, url, None),
|
||||
[t % dict(url='/ipa/foo/stuff')]
|
||||
)
|
||||
assert s.status == '404 Not Found'
|
||||
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
||||
|
||||
# Test when URL contains any of '<>&'
|
||||
s.reset()
|
||||
url =' ' + '<script>do_bad_stuff();</script>'
|
||||
assert_equal(
|
||||
f.not_found(None, s, url, None),
|
||||
[t % dict(url='&nbsp;<script>do_bad_stuff();</script>')]
|
||||
)
|
||||
assert s.status == '404 Not Found'
|
||||
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
||||
|
||||
|
||||
def test_bad_request():
|
||||
f = rpcserver.HTTP_Status()
|
||||
t = rpcserver._bad_request_template
|
||||
s = StartResponse()
|
||||
|
||||
assert_equal(
|
||||
f.bad_request(None, s, 'illegal request'),
|
||||
[t % dict(message='illegal request')]
|
||||
)
|
||||
assert s.status == '400 Bad Request'
|
||||
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
||||
|
||||
|
||||
def test_internal_error():
|
||||
f = rpcserver.HTTP_Status()
|
||||
t = rpcserver._internal_error_template
|
||||
s = StartResponse()
|
||||
|
||||
assert_equal(
|
||||
f.internal_error(None, s, 'request failed'),
|
||||
[t % dict(message='request failed')]
|
||||
)
|
||||
assert s.status == '500 Internal Server Error'
|
||||
assert s.headers == [('Content-Type', 'text/html; charset=utf-8')]
|
||||
|
||||
|
||||
def test_unauthorized_error():
|
||||
f = rpcserver.HTTP_Status()
|
||||
t = rpcserver._unauthorized_template
|
||||
s = StartResponse()
|
||||
|
||||
assert_equal(
|
||||
f.unauthorized(None, s, 'unauthorized', 'password-expired'),
|
||||
[t % dict(message='unauthorized')]
|
||||
)
|
||||
assert s.status == '401 Unauthorized'
|
||||
assert s.headers == [('Content-Type', 'text/html; charset=utf-8'),
|
||||
('X-IPA-Rejection-Reason', 'password-expired')]
|
||||
|
||||
|
||||
def test_params_2_args_options():
|
||||
"""
|
||||
Test the `ipaserver.rpcserver.params_2_args_options` function.
|
||||
"""
|
||||
f = rpcserver.params_2_args_options
|
||||
args = ('Hello', u'world!')
|
||||
options = dict(one=1, two=u'Two', three='Three')
|
||||
assert f(tuple()) == (tuple(), dict())
|
||||
assert f([args]) == (args, dict())
|
||||
assert f([args, options]) == (args, options)
|
||||
|
||||
|
||||
class test_session(object):
|
||||
klass = rpcserver.wsgi_dispatch
|
||||
|
||||
def test_route(self):
|
||||
def app1(environ, start_response):
|
||||
return (
|
||||
'from 1',
|
||||
[environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
|
||||
)
|
||||
|
||||
def app2(environ, start_response):
|
||||
return (
|
||||
'from 2',
|
||||
[environ[k] for k in ('SCRIPT_NAME', 'PATH_INFO')]
|
||||
)
|
||||
|
||||
inst = self.klass()
|
||||
inst.mount(app1, '/foo/stuff')
|
||||
inst.mount(app2, '/bar')
|
||||
|
||||
d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/foo/stuff')
|
||||
assert inst.route(d, None) == ('from 1', ['/ipa', '/foo/stuff'])
|
||||
|
||||
d = dict(SCRIPT_NAME='/ipa', PATH_INFO='/bar')
|
||||
assert inst.route(d, None) == ('from 2', ['/ipa', '/bar'])
|
||||
|
||||
def test_mount(self):
|
||||
def app1(environ, start_response):
|
||||
pass
|
||||
|
||||
def app2(environ, start_response):
|
||||
pass
|
||||
|
||||
# Test that mount works:
|
||||
inst = self.klass()
|
||||
inst.mount(app1, 'foo')
|
||||
assert inst['foo'] is app1
|
||||
assert list(inst) == ['foo']
|
||||
|
||||
# Test that StandardError is raise if trying override a mount:
|
||||
e = raises(StandardError, inst.mount, app2, 'foo')
|
||||
assert str(e) == '%s.mount(): cannot replace %r with %r at %r' % (
|
||||
'wsgi_dispatch', app1, app2, 'foo'
|
||||
)
|
||||
|
||||
# Test mounting a second app:
|
||||
inst.mount(app2, 'bar')
|
||||
assert inst['bar'] is app2
|
||||
assert list(inst) == ['bar', 'foo']
|
||||
|
||||
|
||||
class test_xmlserver(PluginTester):
|
||||
"""
|
||||
Test the `ipaserver.rpcserver.xmlserver` plugin.
|
||||
"""
|
||||
|
||||
_plugin = rpcserver.xmlserver
|
||||
|
||||
def test_marshaled_dispatch(self): # FIXME
|
||||
(o, api, home) = self.instance('Backend', in_server=True)
|
||||
|
||||
|
||||
class test_jsonserver(PluginTester):
|
||||
"""
|
||||
Test the `ipaserver.rpcserver.jsonserver` plugin.
|
||||
"""
|
||||
|
||||
_plugin = rpcserver.jsonserver
|
||||
|
||||
def test_unmarshal(self):
|
||||
"""
|
||||
Test the `ipaserver.rpcserver.jsonserver.unmarshal` method.
|
||||
"""
|
||||
(o, api, home) = self.instance('Backend', in_server=True)
|
||||
|
||||
# Test with invalid JSON-data:
|
||||
e = raises(errors.JSONError, o.unmarshal, 'this wont work')
|
||||
assert isinstance(e.error, ValueError)
|
||||
assert unicode(e.error) == 'No JSON object could be decoded'
|
||||
|
||||
# Test with non-dict type:
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps([1, 2, 3]))
|
||||
assert unicode(e.error) == 'Request must be a dict'
|
||||
|
||||
params = [[1, 2], dict(three=3, four=4)]
|
||||
# Test with missing method:
|
||||
d = dict(params=params, id=18)
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
||||
assert unicode(e.error) == 'Request is missing "method"'
|
||||
|
||||
# Test with missing params:
|
||||
d = dict(method='echo', id=18)
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
||||
assert unicode(e.error) == 'Request is missing "params"'
|
||||
|
||||
# Test with non-list params:
|
||||
for p in ('hello', dict(args=tuple(), options=dict())):
|
||||
d = dict(method='echo', id=18, params=p)
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
||||
assert unicode(e.error) == 'params must be a list'
|
||||
|
||||
# Test with other than 2 params:
|
||||
for p in ([], [tuple()], [None, dict(), tuple()]):
|
||||
d = dict(method='echo', id=18, params=p)
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
||||
assert unicode(e.error) == 'params must contain [args, options]'
|
||||
|
||||
# Test when args is not a list:
|
||||
d = dict(method='echo', id=18, params=['args', dict()])
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
||||
assert unicode(e.error) == 'params[0] (aka args) must be a list'
|
||||
|
||||
# Test when options is not a dict:
|
||||
d = dict(method='echo', id=18, params=[('hello', 'world'), 'options'])
|
||||
e = raises(errors.JSONError, o.unmarshal, json.dumps(d))
|
||||
assert unicode(e.error) == 'params[1] (aka options) must be a dict'
|
||||
|
||||
# Test with valid values:
|
||||
args = [u'jdoe']
|
||||
options = dict(givenname=u'John', sn='Doe')
|
||||
d = dict(method=u'user_add', params=[args, options], id=18)
|
||||
assert o.unmarshal(json.dumps(d)) == (u'user_add', args, options, 18)
|
||||
Reference in New Issue
Block a user