mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
external-idp: add XMLRPC tests for External IdP objects and idp indicator
Fixes: https://pagure.io/freeipa/issue/8804 Fixes: https://pagure.io/freeipa/issue/8803 Signed-off-by: Alexander Bokovoy <abokovoy@redhat.com> Signed-off-by: Florence Blanc-Renaud <flo@redhat.com> Reviewed-By: Francisco Trivino <ftrivino@redhat.com> Reviewed-By: Sumit Bose <sbose@redhat.com>
This commit is contained in:
parent
10e18c3dc7
commit
03a905eed9
@ -39,6 +39,8 @@ DATA = {
|
|||||||
('textbox', 'krbauthindmaxticketlife_pkinit', '83000'),
|
('textbox', 'krbauthindmaxticketlife_pkinit', '83000'),
|
||||||
('textbox', 'krbauthindmaxrenewableage_hardened', '604000'),
|
('textbox', 'krbauthindmaxrenewableage_hardened', '604000'),
|
||||||
('textbox', 'krbauthindmaxticketlife_hardened', '84000'),
|
('textbox', 'krbauthindmaxticketlife_hardened', '84000'),
|
||||||
|
('textbox', 'krbauthindmaxrenewableage_idp', '605000'),
|
||||||
|
('textbox', 'krbauthindmaxticketlife_idp', '85000'),
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,6 +56,8 @@ DATA2 = {
|
|||||||
('textbox', 'krbauthindmaxticketlife_pkinit', '86400'),
|
('textbox', 'krbauthindmaxticketlife_pkinit', '86400'),
|
||||||
('textbox', 'krbauthindmaxrenewableage_hardened', '604800'),
|
('textbox', 'krbauthindmaxrenewableage_hardened', '604800'),
|
||||||
('textbox', 'krbauthindmaxticketlife_hardened', '86400'),
|
('textbox', 'krbauthindmaxticketlife_hardened', '86400'),
|
||||||
|
('textbox', 'krbauthindmaxrenewableage_idp', '604800'),
|
||||||
|
('textbox', 'krbauthindmaxticketlife_idp', '86400'),
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ class user_tasks(UI_driver):
|
|||||||
def assert_user_auth_type(self, auth_type, enabled=True):
|
def assert_user_auth_type(self, auth_type, enabled=True):
|
||||||
"""
|
"""
|
||||||
Check if provided auth type is enabled or disabled for the user
|
Check if provided auth type is enabled or disabled for the user
|
||||||
:param auth_type: one of password, radius, otp, pkinit or hardened
|
:param auth_type: one of password, radius, otp, pkinit, hardened or idp
|
||||||
:param enabled: check if enabled if True, check for disabled if False
|
:param enabled: check if enabled if True, check for disabled if False
|
||||||
"""
|
"""
|
||||||
s_checkbox = 'div[name="ipauserauthtype"] input[value="{}"]'.format(
|
s_checkbox = 'div[name="ipauserauthtype"] input[value="{}"]'.format(
|
||||||
@ -101,7 +101,7 @@ class user_tasks(UI_driver):
|
|||||||
def add_user_auth_type(self, auth_type, save=False):
|
def add_user_auth_type(self, auth_type, save=False):
|
||||||
"""
|
"""
|
||||||
Select user auth type
|
Select user auth type
|
||||||
:param auth_type: one of password, radius, otp, pkinit or hardened
|
:param auth_type: one of password, radius, otp, pkinit, hardened or idp
|
||||||
"""
|
"""
|
||||||
s_checkbox = 'div[name="ipauserauthtype"] input[value="{}"]'.format(
|
s_checkbox = 'div[name="ipauserauthtype"] input[value="{}"]'.format(
|
||||||
auth_type)
|
auth_type)
|
||||||
|
@ -239,3 +239,8 @@ certmapconfig = [
|
|||||||
u'nsContainer',
|
u'nsContainer',
|
||||||
u'ipaCertMapConfigObject',
|
u'ipaCertMapConfigObject',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
idp = [
|
||||||
|
'top',
|
||||||
|
'ipaidp',
|
||||||
|
]
|
||||||
|
265
ipatests/test_xmlrpc/test_idp_plugin.py
Normal file
265
ipatests/test_xmlrpc/test_idp_plugin.py
Normal file
@ -0,0 +1,265 @@
|
|||||||
|
#
|
||||||
|
# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
|
||||||
|
#
|
||||||
|
|
||||||
|
"""
|
||||||
|
Test the `ipaserver.plugins.idp` module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ipalib import errors
|
||||||
|
from ipatests.test_xmlrpc.xmlrpc_test import (
|
||||||
|
XMLRPC_test, raises_exact)
|
||||||
|
from ipatests.test_xmlrpc.tracker.idp_plugin import IdpTracker
|
||||||
|
|
||||||
|
google_auth = "https://accounts.google.com/o/oauth2/auth"
|
||||||
|
google_devauth = "https://oauth2.googleapis.com/device/code"
|
||||||
|
google_token = "https://oauth2.googleapis.com/token"
|
||||||
|
google_userinfo = "https://openidconnect.googleapis.com/v1/userinfo"
|
||||||
|
google_jwks = "https://www.googleapis.com/oauth2/v3/certs"
|
||||||
|
|
||||||
|
idp_scope = "openid email"
|
||||||
|
idp_sub = "email"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='class')
|
||||||
|
def idp(request, xmlrpc_setup):
|
||||||
|
tracker = IdpTracker('idp1', ipaidpauthendpoint=google_auth,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidptokenendpoint=google_token,
|
||||||
|
ipaidpuserinfoendpoint=google_userinfo,
|
||||||
|
ipaidpkeysendpoint=google_jwks,
|
||||||
|
ipaidpclientid="idp1client",
|
||||||
|
ipaidpclientsecret="Secret123",
|
||||||
|
ipaidpscope=idp_scope)
|
||||||
|
return tracker.make_fixture(request)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope='class')
|
||||||
|
def renamedidp(request, xmlrpc_setup):
|
||||||
|
tracker = IdpTracker('idp2', ipaidpauthendpoint=google_auth,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidptokenendpoint=google_token,
|
||||||
|
ipaidpuserinfoendpoint=google_userinfo,
|
||||||
|
ipaidpkeysendpoint=google_jwks,
|
||||||
|
ipaidpclientid="idp1client",
|
||||||
|
ipaidpclientsecret="Secret123",
|
||||||
|
ipaidpscope=idp_scope)
|
||||||
|
return tracker.make_fixture(request)
|
||||||
|
|
||||||
|
|
||||||
|
class TestNonexistentIdp(XMLRPC_test):
|
||||||
|
def test_retrieve_nonexistent(self, idp):
|
||||||
|
""" Try to retrieve a non-existent idp """
|
||||||
|
idp.ensure_missing()
|
||||||
|
command = idp.make_retrieve_command()
|
||||||
|
with raises_exact(errors.NotFound(
|
||||||
|
reason='%s: Identity Provider server not found' % idp.cn)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_update_nonexistent(self, idp):
|
||||||
|
""" Try to update a non-existent idp """
|
||||||
|
idp.ensure_missing()
|
||||||
|
command = idp.make_update_command(
|
||||||
|
updates=dict(ipaidpclientid='idpclient2'))
|
||||||
|
with raises_exact(errors.NotFound(
|
||||||
|
reason='%s: Identity Provider server not found' % idp.cn)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_delete_nonexistent(self, idp):
|
||||||
|
""" Try to delete a non-existent idp """
|
||||||
|
idp.ensure_missing()
|
||||||
|
command = idp.make_delete_command()
|
||||||
|
with raises_exact(errors.NotFound(
|
||||||
|
reason='%s: Identity Provider server not found' % idp.cn)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_rename_nonexistent(self, idp, renamedidp):
|
||||||
|
""" Try to rename a non-existent idp """
|
||||||
|
idp.ensure_missing()
|
||||||
|
command = idp.make_update_command(
|
||||||
|
updates=dict(setattr='cn=%s' % renamedidp.cn))
|
||||||
|
with raises_exact(errors.NotFound(
|
||||||
|
reason='%s: Identity Provider server not found' % idp.cn)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.tier1
|
||||||
|
class TestIdP(XMLRPC_test):
|
||||||
|
def test_retrieve(self, idp):
|
||||||
|
"""" Create idp and try to retrieve it """
|
||||||
|
idp.ensure_exists()
|
||||||
|
idp.retrieve()
|
||||||
|
|
||||||
|
def test_delete(self, idp):
|
||||||
|
""" Delete idp """
|
||||||
|
idp.ensure_exists()
|
||||||
|
idp.delete()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.tier1
|
||||||
|
class TestFindIdp(XMLRPC_test):
|
||||||
|
def test_find(self, idp):
|
||||||
|
""" Basic check of idp-find """
|
||||||
|
idp.ensure_exists()
|
||||||
|
idp.find()
|
||||||
|
|
||||||
|
def test_find_with_all(self, idp):
|
||||||
|
""" Basic check of idp-find with --all """
|
||||||
|
idp.ensure_exists()
|
||||||
|
idp.find(all=True)
|
||||||
|
|
||||||
|
def test_find_with_pkey_only(self, idp):
|
||||||
|
""" Basic check of idp-find with primary keys only """
|
||||||
|
idp.ensure_exists()
|
||||||
|
command = idp.make_find_command(cn=idp.cn, pkey_only=True)
|
||||||
|
result = command()
|
||||||
|
idp.check_find(result, pkey_only=True)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.tier1
|
||||||
|
class TestUpdateIdp(XMLRPC_test):
|
||||||
|
def test_update(self, idp):
|
||||||
|
""" Basic check of idp-mod """
|
||||||
|
idp.ensure_exists()
|
||||||
|
idp.update(
|
||||||
|
updates=dict(ipaidpclientid='NewClientID')
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_rename(self, idp, renamedidp):
|
||||||
|
""" Rename idp and rename it back """
|
||||||
|
idp.ensure_exists()
|
||||||
|
renamedidp.ensure_missing()
|
||||||
|
oldcn = idp.cn
|
||||||
|
|
||||||
|
idp.update(updates=dict(rename=renamedidp.cn))
|
||||||
|
idp.update(updates=dict(rename=oldcn))
|
||||||
|
|
||||||
|
def test_rename_to_same_value(self, idp):
|
||||||
|
""" Try to rename idp to the same value """
|
||||||
|
idp.ensure_exists()
|
||||||
|
command = idp.make_update_command(
|
||||||
|
updates=dict(setattr=('cn=%s' % idp.cn))
|
||||||
|
)
|
||||||
|
with raises_exact(errors.EmptyModlist()):
|
||||||
|
command()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.tier1
|
||||||
|
class TestCreateIdp(XMLRPC_test):
|
||||||
|
def test_create_idp_with_min_values(self):
|
||||||
|
""" Creation with only mandatory parameters """
|
||||||
|
idp_min = IdpTracker('min_idp', ipaidpauthendpoint=google_auth,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidptokenendpoint=google_token,
|
||||||
|
ipaidpuserinfoendpoint=google_userinfo,
|
||||||
|
ipaidpkeysendpoint=google_jwks,
|
||||||
|
ipaidpclientid="idp1client")
|
||||||
|
idp_min.track_create()
|
||||||
|
command = idp_min.make_create_command()
|
||||||
|
result = command()
|
||||||
|
idp_min.check_create(result)
|
||||||
|
idp_min.delete()
|
||||||
|
|
||||||
|
def test_create_idp_with_provider(self):
|
||||||
|
""" Creation with --provider parameter """
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider', ipaidpprovider='google',
|
||||||
|
ipaidpclientid="idpclient1")
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
# the endpoints are automatically added
|
||||||
|
idp_with_provider.attrs.update(ipaidpauthendpoint=[google_auth])
|
||||||
|
idp_with_provider.attrs.update(ipaidpdevauthendpoint=[google_devauth])
|
||||||
|
idp_with_provider.attrs.update(ipaidptokenendpoint=[google_token])
|
||||||
|
idp_with_provider.attrs.update(ipaidpkeysendpoint=[google_jwks])
|
||||||
|
idp_with_provider.attrs.update(ipaidpuserinfoendpoint=[google_userinfo])
|
||||||
|
idp_with_provider.attrs.update(ipaidpscope=[idp_scope])
|
||||||
|
idp_with_provider.attrs.update(ipaidpsub=[idp_sub])
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
result = command()
|
||||||
|
idp_with_provider.check_create(result)
|
||||||
|
idp_with_provider.delete()
|
||||||
|
|
||||||
|
def test_create_with_invalid_provider(self):
|
||||||
|
""" Creation with invalid --provider parameter """
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider', ipaidpprovider='fake',
|
||||||
|
ipaidpclientid="idpclient1")
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
with raises_exact(errors.ValidationError(
|
||||||
|
name='provider',
|
||||||
|
error="must be one of 'google', 'github', 'microsoft', "
|
||||||
|
"'okta', 'keycloak'"
|
||||||
|
)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_create_with_provider_and_authendpoint(self):
|
||||||
|
""" Creation with --provider parameter and --auth-uri"""
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider', ipaidpprovider='google',
|
||||||
|
ipaidpauthendpoint=google_auth,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidpclientid="idpclient1")
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
with raises_exact(errors.MutuallyExclusiveError(
|
||||||
|
reason='cannot specify both individual endpoints and IdP provider'
|
||||||
|
)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_create_with_provider_and_tokenendpoint(self):
|
||||||
|
""" Creation with --provider parameter and --token-uri"""
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider', ipaidpprovider='google',
|
||||||
|
ipaidptokenendpoint=google_token,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidpclientid="idpclient1")
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
with raises_exact(errors.MutuallyExclusiveError(
|
||||||
|
reason='cannot specify both individual endpoints and IdP provider'
|
||||||
|
)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_create_missing_authendpoint(self):
|
||||||
|
""" Creation with missing --dev-auth-uri and --auth-uri"""
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider',
|
||||||
|
ipaidptokenendpoint=google_token,
|
||||||
|
ipaidpclientid="idpclient1")
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
with raises_exact(errors.RequirementError(
|
||||||
|
name='dev-auth-uri or provider'
|
||||||
|
)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_create_missing_tokenendpoint(self):
|
||||||
|
""" Creation with missing --token-uri"""
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider',
|
||||||
|
ipaidpauthendpoint=google_auth,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidpclientid="idpclient1")
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
with raises_exact(errors.RequirementError(
|
||||||
|
name='token-uri or provider'
|
||||||
|
)):
|
||||||
|
command()
|
||||||
|
|
||||||
|
def test_create_missing_clientid(self):
|
||||||
|
""" Creation with missing --client-id"""
|
||||||
|
idp_with_provider = IdpTracker(
|
||||||
|
'idp_with_provider',
|
||||||
|
ipaidptokenendpoint=google_token,
|
||||||
|
ipaidpdevauthendpoint=google_devauth,
|
||||||
|
ipaidpauthendpoint=google_auth)
|
||||||
|
idp_with_provider.track_create()
|
||||||
|
command = idp_with_provider.make_create_command()
|
||||||
|
with raises_exact(errors.RequirementError(
|
||||||
|
name='client_id'
|
||||||
|
)):
|
||||||
|
command()
|
@ -40,6 +40,8 @@ parameters = [('krbauthindmaxrenewableage_radius', 'radius_maxrenew'),
|
|||||||
('krbauthindmaxticketlife_otp', 'otp_maxlife'),
|
('krbauthindmaxticketlife_otp', 'otp_maxlife'),
|
||||||
('krbauthindmaxrenewableage_hardened', 'hardened_maxrenew'),
|
('krbauthindmaxrenewableage_hardened', 'hardened_maxrenew'),
|
||||||
('krbauthindmaxticketlife_hardened', 'hardened_maxlife'),
|
('krbauthindmaxticketlife_hardened', 'hardened_maxlife'),
|
||||||
|
('krbauthindmaxrenewableage_idp', 'idp_maxrenew'),
|
||||||
|
('krbauthindmaxticketlife_idp', 'idp_maxlife'),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@ -284,6 +286,51 @@ class test_krbtpolicy(Declarative):
|
|||||||
),
|
),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
dict(
|
||||||
|
desc='Update maxrenew user ticket policy for '
|
||||||
|
'auth indicator idp',
|
||||||
|
command=('krbtpolicy_mod', [user1],
|
||||||
|
dict(krbauthindmaxrenewableage_idp=3900)),
|
||||||
|
expected=dict(
|
||||||
|
value=user1,
|
||||||
|
summary=None,
|
||||||
|
result=dict(
|
||||||
|
krbmaxticketlife=[u'3600'],
|
||||||
|
krbauthindmaxticketlife_otp=[u'3700'],
|
||||||
|
krbauthindmaxticketlife_pkinit=[u'3800'],
|
||||||
|
krbauthindmaxticketlife_radius=[u'1'],
|
||||||
|
krbauthindmaxticketlife_hardened=[u'2147483647'],
|
||||||
|
krbauthindmaxrenewableage_hardened=[u'2147483647'],
|
||||||
|
krbauthindmaxrenewableage_otp=[u'3700'],
|
||||||
|
krbauthindmaxrenewableage_radius=[u'1'],
|
||||||
|
krbauthindmaxrenewableage_pkinit=[u'3800'],
|
||||||
|
krbauthindmaxrenewableage_idp=[u'3900'],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
dict(
|
||||||
|
desc='Update maxlife user ticket policy for '
|
||||||
|
'auth indicator idp',
|
||||||
|
command=('krbtpolicy_mod', [user1],
|
||||||
|
dict(krbauthindmaxticketlife_idp=3900)),
|
||||||
|
expected=dict(
|
||||||
|
value=user1,
|
||||||
|
summary=None,
|
||||||
|
result=dict(
|
||||||
|
krbmaxticketlife=[u'3600'],
|
||||||
|
krbauthindmaxticketlife_otp=[u'3700'],
|
||||||
|
krbauthindmaxticketlife_pkinit=[u'3800'],
|
||||||
|
krbauthindmaxticketlife_radius=[u'1'],
|
||||||
|
krbauthindmaxticketlife_hardened=[u'2147483647'],
|
||||||
|
krbauthindmaxrenewableage_hardened=[u'2147483647'],
|
||||||
|
krbauthindmaxrenewableage_otp=[u'3700'],
|
||||||
|
krbauthindmaxrenewableage_radius=[u'1'],
|
||||||
|
krbauthindmaxrenewableage_pkinit=[u'3800'],
|
||||||
|
krbauthindmaxrenewableage_idp=[u'3900'],
|
||||||
|
krbauthindmaxticketlife_idp=[u'3900'],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
|
||||||
dict(
|
dict(
|
||||||
desc='Try updating other user attribute',
|
desc='Try updating other user attribute',
|
||||||
|
@ -1587,7 +1587,7 @@ class TestAuthenticationIndicators(XMLRPC_test):
|
|||||||
indicators_service.update(
|
indicators_service.update(
|
||||||
updates={
|
updates={
|
||||||
u'krbprincipalauthind': [
|
u'krbprincipalauthind': [
|
||||||
u'otp', u'radius', u'pkinit', u'hardened'
|
u'otp', u'radius', u'pkinit', u'hardened', u'idp'
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -467,7 +467,7 @@ class TestUpdate(XMLRPC_test):
|
|||||||
""" Set ipauserauthtype to all valid types and than back to None """
|
""" Set ipauserauthtype to all valid types and than back to None """
|
||||||
user.ensure_exists()
|
user.ensure_exists()
|
||||||
user.update(dict(ipauserauthtype=[
|
user.update(dict(ipauserauthtype=[
|
||||||
u'password', u'radius', u'otp', u'pkinit', u'hardened'
|
u'password', u'radius', u'otp', u'pkinit', u'hardened', u'idp'
|
||||||
]))
|
]))
|
||||||
user.retrieve()
|
user.retrieve()
|
||||||
|
|
||||||
|
170
ipatests/test_xmlrpc/tracker/idp_plugin.py
Normal file
170
ipatests/test_xmlrpc/tracker/idp_plugin.py
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
#
|
||||||
|
# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
|
||||||
|
#
|
||||||
|
|
||||||
|
from ipalib import api
|
||||||
|
from ipapython.dn import DN
|
||||||
|
from ipatests.test_xmlrpc.tracker.base import Tracker
|
||||||
|
from ipatests.test_xmlrpc import objectclasses
|
||||||
|
from ipatests.util import assert_deepequal
|
||||||
|
|
||||||
|
|
||||||
|
class IdpTracker(Tracker):
|
||||||
|
"""Class for ipd tests"""
|
||||||
|
|
||||||
|
retrieve_keys = {
|
||||||
|
'dn', 'cn', 'ipaidpauthendpoint', 'ipaidpdevauthendpoint',
|
||||||
|
'ipaidpuserinfoendpoint', 'ipaidpkeysendpoint',
|
||||||
|
'ipaidptokenendpoint', 'ipaidpissuerurl',
|
||||||
|
'ipaidpclientid', 'ipaidpscope', 'ipaidpsub'}
|
||||||
|
|
||||||
|
retrieve_all_keys = retrieve_keys | {
|
||||||
|
'objectclass', 'ipaidpclientsecret'
|
||||||
|
}
|
||||||
|
|
||||||
|
create_keys = retrieve_all_keys
|
||||||
|
|
||||||
|
update_keys = retrieve_keys - {'dn'}
|
||||||
|
|
||||||
|
find_keys = retrieve_keys
|
||||||
|
find_all_keys = retrieve_all_keys
|
||||||
|
|
||||||
|
primary_keys = {'cn', 'dn'}
|
||||||
|
|
||||||
|
def __init__(self, cn, **kwargs):
|
||||||
|
super(IdpTracker, self).__init__(default_version=None)
|
||||||
|
self.cn = cn
|
||||||
|
self.dn = DN(('cn', cn), api.env.container_idp, api.env.basedn)
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
def make_create_command(self):
|
||||||
|
""" Make function that creates an idp using idp-add """
|
||||||
|
return self.make_command('idp_add', self.cn, **self.kwargs)
|
||||||
|
|
||||||
|
def track_create(self):
|
||||||
|
""" Update expected state for idp creation """
|
||||||
|
self.attrs = dict(
|
||||||
|
dn=self.dn,
|
||||||
|
cn=[self.cn],
|
||||||
|
objectclass=objectclasses.idp,
|
||||||
|
)
|
||||||
|
for key, value in self.kwargs.items():
|
||||||
|
if key == 'ipaidpclientsecret':
|
||||||
|
self.attrs[key] = [value.encode('utf-8')]
|
||||||
|
continue
|
||||||
|
if type(value) is not list:
|
||||||
|
self.attrs[key] = [value]
|
||||||
|
else:
|
||||||
|
self.attrs[key] = value
|
||||||
|
self.exists = True
|
||||||
|
|
||||||
|
def check_create(self, result, extra_keys=()):
|
||||||
|
""" Check idp-add command result """
|
||||||
|
expected = self.filter_attrs(self.create_keys | set(extra_keys))
|
||||||
|
assert_deepequal(
|
||||||
|
dict(
|
||||||
|
value=self.cn,
|
||||||
|
summary='Added Identity Provider server "%s"' % self.cn,
|
||||||
|
result=self.filter_attrs(expected),
|
||||||
|
), result)
|
||||||
|
|
||||||
|
def make_delete_command(self):
|
||||||
|
""" Make function that deletes an idp using idp-del """
|
||||||
|
return self.make_command('idp_del', self.cn)
|
||||||
|
|
||||||
|
def check_delete(self, result):
|
||||||
|
""" Check idp-del command result """
|
||||||
|
assert_deepequal(
|
||||||
|
dict(
|
||||||
|
value=[self.cn],
|
||||||
|
summary='Deleted Identity Provider server "%s"' % self.cn,
|
||||||
|
result=dict(failed=[]),
|
||||||
|
), result)
|
||||||
|
|
||||||
|
def make_retrieve_command(self, all=False, raw=False):
|
||||||
|
""" Make function that retrieves an idp using idp-show """
|
||||||
|
return self.make_command('idp_show', self.cn, all=all)
|
||||||
|
|
||||||
|
def check_retrieve(self, result, all=False, raw=False):
|
||||||
|
""" Check idp-show command result """
|
||||||
|
if all:
|
||||||
|
expected = self.filter_attrs(self.retrieve_all_keys)
|
||||||
|
else:
|
||||||
|
expected = self.filter_attrs(self.retrieve_keys)
|
||||||
|
assert_deepequal(dict(
|
||||||
|
value=self.cn,
|
||||||
|
summary=None,
|
||||||
|
result=expected,
|
||||||
|
), result)
|
||||||
|
|
||||||
|
def make_find_command(self, *args, **kwargs):
|
||||||
|
""" Make function that finds idp using idp-find """
|
||||||
|
return self.make_command('idp_find', *args, **kwargs)
|
||||||
|
|
||||||
|
def check_find(self, result, all=False, pkey_only=False, raw=False):
|
||||||
|
""" Check idp-find command result """
|
||||||
|
if all:
|
||||||
|
expected = self.filter_attrs(self.find_all_keys)
|
||||||
|
elif pkey_only:
|
||||||
|
expected = self.filter_attrs(self.primary_keys)
|
||||||
|
else:
|
||||||
|
expected = self.filter_attrs(self.find_keys)
|
||||||
|
|
||||||
|
assert_deepequal(dict(
|
||||||
|
count=1,
|
||||||
|
truncated=False,
|
||||||
|
summary='1 Identity Provider server matched',
|
||||||
|
result=[expected],
|
||||||
|
), result)
|
||||||
|
|
||||||
|
def make_update_command(self, updates):
|
||||||
|
""" Make function that updates an idp using idp_mod """
|
||||||
|
return self.make_command('idp_mod', self.cn, **updates)
|
||||||
|
|
||||||
|
def update(self, updates, expected_updates=None):
|
||||||
|
"""Helper function to update this idp and check the result
|
||||||
|
|
||||||
|
Overriding Tracker method for setting self.attrs correctly;
|
||||||
|
* most attributes stores its value in list
|
||||||
|
* the rest can be overridden by expected_updates
|
||||||
|
* allow deleting parameters if update value is None
|
||||||
|
"""
|
||||||
|
if expected_updates is None:
|
||||||
|
expected_updates = {}
|
||||||
|
|
||||||
|
self.ensure_exists()
|
||||||
|
command = self.make_update_command(updates)
|
||||||
|
result = command()
|
||||||
|
|
||||||
|
for key, value in updates.items():
|
||||||
|
if value is None or value == '':
|
||||||
|
del self.attrs[key]
|
||||||
|
elif key == 'rename':
|
||||||
|
self.attrs['cn'] = [value]
|
||||||
|
else:
|
||||||
|
if type(value) is list:
|
||||||
|
self.attrs[key] = value
|
||||||
|
else:
|
||||||
|
self.attrs[key] = [value]
|
||||||
|
for key, value in expected_updates.items():
|
||||||
|
if value is None or value == '':
|
||||||
|
del self.attrs[key]
|
||||||
|
else:
|
||||||
|
self.attrs[key] = value
|
||||||
|
|
||||||
|
self.check_update(
|
||||||
|
result,
|
||||||
|
extra_keys=set(updates.keys()) | set(expected_updates.keys())
|
||||||
|
)
|
||||||
|
|
||||||
|
if 'rename' in updates:
|
||||||
|
self.cn = self.attrs['cn'][0]
|
||||||
|
|
||||||
|
def check_update(self, result, extra_keys=()):
|
||||||
|
""" Check idp-mod command result """
|
||||||
|
expected = self.filter_attrs(self.update_keys | set(extra_keys))
|
||||||
|
assert_deepequal(dict(
|
||||||
|
value=self.cn,
|
||||||
|
summary='Modified Identity Provider server "%s"' % self.cn,
|
||||||
|
result=expected
|
||||||
|
), result)
|
Loading…
Reference in New Issue
Block a user