########################################################################## # # pgAdmin 4 - PostgreSQL Tools # # Copyright (C) 2013 - 2022, The pgAdmin Development Team # This software is released under the PostgreSQL Licence # ########################################################################## import config as app_config from pgadmin.utils.route import BaseTestGenerator from regression.python_test_utils import test_utils as utils from pgadmin.authenticate.registry import AuthSourceRegistry from unittest.mock import patch, MagicMock from werkzeug.datastructures import Headers from pgadmin.utils.constants import LDAP, INTERNAL, KERBEROS class KerberosLoginMockTestCase(BaseTestGenerator): """ This class checks Spnego/Kerberos login functionality by mocking HTTP negotiate authentication. """ scenarios = [ ('Spnego/Kerberos Authentication: Test Unauthorized', dict( auth_source=[KERBEROS], auto_create_user=True, flag=1 )), ('Spnego/Kerberos Authentication: Test Authorized', dict( auth_source=[KERBEROS], auto_create_user=True, flag=2 )), ('Spnego/Kerberos Update Ticket', dict( auth_source=[KERBEROS], auto_create_user=True, flag=3 )) ] @classmethod def setUpClass(cls): """ We need to logout the test client as we are testing spnego/kerberos login scenarios. """ cls.tester.logout() def setUp(self): app_config.AUTHENTICATION_SOURCES = self.auth_source self.app.PGADMIN_EXTERNAL_AUTH_SOURCE = KERBEROS def runTest(self): """This function checks spnego/kerberos login functionality.""" if self.flag == 1: self.test_unauthorized() elif self.flag == 2: if app_config.SERVER_MODE is False: self.skipTest( "Can not run Kerberos Authentication in the Desktop mode." ) self.test_authorized() elif self.flag == 3: if app_config.SERVER_MODE is False: self.skipTest( "Can not run Kerberos Authentication in the Desktop mode." ) self.test_update_ticket() def test_unauthorized(self): """ Ensure that when client sends the first request, the Negotiate request is sent. """ res = self.tester.login(None, None, True) self.assertEqual(res.status_code, 401) self.assertEqual(res.headers.get('www-authenticate'), 'Negotiate') def test_authorized(self): """ Ensure that when the client sends an correct authorization token, they receive a 200 OK response and the user principal is extracted and passed on to the routed method. """ del_crads = self.mock_negotiate_start() res = self.tester.login(None, None, True, headers={'Authorization': 'Negotiate CTOKEN'} ) self.assertEqual(res.status_code, 200) respdata = 'Gravatar image for %s' % del_crads.initiator_name self.assertTrue(respdata in res.data.decode('utf8')) def mock_negotiate_start(self): class delCrads: def __init__(self): self.initiator_name = 'user@PGADMIN.ORG' del_crads = delCrads() AuthSourceRegistry._registry[KERBEROS].negotiate_start = MagicMock( return_value=[True, del_crads]) return del_crads def test_update_ticket(self): # Response header should include the Negotiate header in the first call response = self.tester.get('/kerberos/update_ticket') self.assertEqual(response.status_code, 401) self.assertEqual(response.headers.get('www-authenticate'), 'Negotiate') # When we send the Kerberos Ticket, it should return success del_crads = self.mock_negotiate_start() krb_token = Headers({}) krb_token['Authorization'] = 'Negotiate CTOKEN' response = self.tester.get('/kerberos/update_ticket', headers=krb_token) self.assertEqual(response.status_code, 200) self.tester.logout() def tearDown(self): pass @classmethod def tearDownClass(cls): """ We need to again login the test client as soon as test scenarios finishes. """ cls.tester.logout() app_config.AUTHENTICATION_SOURCES = [INTERNAL] app_config.PGADMIN_EXTERNAL_AUTH_SOURCE = INTERNAL utils.login_tester_account(cls.tester)