Add support for additional ID token claim checks for OAuth 2 authentication. #6736

This commit is contained in:
Everton Seiei Arakaki
2023-09-05 07:58:18 +02:00
committed by GitHub
parent e5c249e81c
commit 02eaf787e9
4 changed files with 136 additions and 0 deletions

View File

@@ -12,6 +12,10 @@ installed in Server mode. You can copy these settings from *config.py* file
and modify the values for the following parameters:
.. _AzureAD: https://learn.microsoft.com/en-us/security/zero-trust/develop/configure-tokens-group-claims-app-roles
.. _GitLab: https://docs.gitlab.com/ee/integration/openid_connect_provider.html#shared-information
.. csv-table::
:header: "**Parameter**", "**Description**"
:class: longtable
@@ -39,6 +43,9 @@ and modify the values for the following parameters:
"OAUTH2_AUTO_CREATE_USER", "Set the value to *True* if you want to automatically
create a pgAdmin user corresponding to a successfully authenticated Oauth2 user.
Please note that password is not stored in the pgAdmin database."
"OAUTH2_ADDITIONAL_CLAIMS", "If a dictionary is provided, pgAdmin will check for a matching key and value on the user profile.
In case the profile does not have any match with the provided config, the user will receive an authorization error.
Useful for checking AzureAD_ *wids* or *groups*, GitLab_ *owner*, *maintainer* and *reporter* claims."
Redirect URL
============

View File

@@ -793,6 +793,30 @@ OAUTH2_CONFIG = [
'OAUTH2_ICON': None,
# UI button colour, ex: #0000ff
'OAUTH2_BUTTON_COLOR': None,
# The additional claims to check on user ID Token. This is useful to
# provide additional authorization checks before allowing access.
# Example for GitLab: allowing all maintainers teams, and a specific
# developers group to access pgadmin:
# 'OAUTH2_ADDITIONAL_CLAIMS': {
# 'https://gitlab.org/claims/groups/maintainer': [
# 'kuberheads/applications',
# 'kuberheads/dba',
# 'kuberheads/support'
# ],
# 'https://gitlab.org/claims/groups/developer': [
# 'kuberheads/applications/team01'
# ],
# }
# Example for AzureAD:
# 'OAUTH2_ADDITIONAL_CLAIMS': {
# 'groups': ["0760b6cf-170e-4a14-91b3-4b78e0739963"],
# 'wids': ["cf1c38e5-3621-4004-a7cb-879624dced7c"],
# }
# Example for any key value string check:
# 'OAUTH2_ADDITIONAL_CLAIMS': {
# 'group': "0760b6cf-170e-4a14-91b3-4b78e0739963",
# }
'OAUTH2_ADDITIONAL_CLAIMS': None,
}
]

View File

@@ -151,6 +151,26 @@ class OAuth2Authentication(BaseAuthentication):
current_app.logger.exception(error_msg)
return False, gettext(error_msg)
additinal_claims = None
if 'OAUTH2_ADDITIONAL_CLAIMS' in self.oauth2_config[
self.oauth2_current_client]:
additinal_claims = self.oauth2_config[
self.oauth2_current_client
]['OAUTH2_ADDITIONAL_CLAIMS']
(valid, reason) = self.__is_additional_claims_valid(profile,
additinal_claims)
if not valid:
return_msg = "The user is not authorized to login" \
" based on the claims in the profile." \
" Please contact your administrator."
audit_msg = f"The authenticated user {username} is not" \
" authorized to access pgAdmin based on OAUTH2 config. " \
f"Reason: {reason}"
current_app.logger.warning(audit_msg)
return False, return_msg
user, msg = self.__auto_create_user(username, email)
if user:
user = db.session.query(User).filter_by(
@@ -204,3 +224,24 @@ class OAuth2Authentication(BaseAuthentication):
})
return True, {'username': username}
def __is_additional_claims_valid(self, profile, additional_claims):
if additional_claims is None:
reason = "Additional claim config is None, no check to do."
return (True, reason)
if not isinstance(additional_claims, dict):
reason = "Additional claim check config is not a dict."
return (False, reason)
if additional_claims.keys() is None:
reason = "Additional claim check config dict is empty."
return (False, reason)
for key in additional_claims.keys():
claim = profile.get(key)
if claim is None:
continue
authorized_claims = additional_claims.get(key)
if any(item in authorized_claims for item in claim):
reason = "Claim match found. Authorizing"
return (True, reason)
reason = f"Profile does not have any of given additional claims."
return (False, reason)

View File

@@ -33,6 +33,11 @@ class Oauth2LoginMockTestCase(BaseTestGenerator):
oauth2_provider='github',
flag=2
)),
('Oauth2 Authentication', dict(
auth_source=['oauth2'],
oauth2_provider='auth-with-additional-claim-check',
flag=3
)),
]
@classmethod
@@ -61,6 +66,25 @@ class Oauth2LoginMockTestCase(BaseTestGenerator):
'OAUTH2_SCOPE': 'email profile',
'OAUTH2_ICON': 'fa-github',
'OAUTH2_BUTTON_COLOR': '#3253a8',
},
{
'OAUTH2_NAME': 'auth-with-additional-claim-check',
'OAUTH2_DISPLAY_NAME': 'Additional Authorization',
'OAUTH2_CLIENT_ID': 'testclientid',
'OAUTH2_CLIENT_SECRET': 'testclientsec',
'OAUTH2_TOKEN_URL':
'https://dummy.com/123/oauth2/v2.0/token',
'OAUTH2_AUTHORIZATION_URL':
'https://dummy.com/123/oauth2/v2.0/authorize',
'OAUTH2_API_BASE_URL': 'https://graph.dummy.com/v1.0/',
'OAUTH2_USERINFO_ENDPOINT': 'me',
'OAUTH2_SCOPE': 'openid email profile',
'OAUTH2_ICON': 'briefcase',
'OAUTH2_BUTTON_COLOR': '#0000ff',
'OAUTH2_ADDITIONAL_CLAIMS': {
'groups': ['123','456'],
'wids': ['789']
}
}
]
@@ -75,6 +99,8 @@ class Oauth2LoginMockTestCase(BaseTestGenerator):
self.test_external_authentication()
elif self.flag == 2:
self.test_oauth2_authentication()
elif self.flag == 3:
self.test_oauth2_authentication_with_additional_claims_success()
def test_external_authentication(self):
"""
@@ -127,6 +153,44 @@ class Oauth2LoginMockTestCase(BaseTestGenerator):
respdata = 'Gravatar image for %s' % profile['email']
self.assertTrue(respdata in res.data.decode('utf8'))
def test_oauth2_authentication_with_additional_claims_success(self):
"""
Ensure that when an oauth2 config has a dict OAUTH2_ADDITIONAL_CLAIMS,
any match of the OAUTH2_ADDITIONAL_CLAIMS dict will allow user login.
"""
profile = self.mock_user_profile_with_additional_claims()
# Mock Oauth2 Authenticate
AuthSourceRegistry._registry[OAUTH2].authenticate = MagicMock(
return_value=[True, ''])
AuthSourceManager.update_auth_sources = MagicMock()
# Create AuthSourceManager object
auth_obj = AuthSourceManager({}, [OAUTH2])
auth_source = AuthSourceRegistry.get(OAUTH2)
auth_obj.set_source(auth_source)
auth_obj.set_current_source(auth_source.get_source_name())
# Check the login with Oauth2
res = self.tester.login(email=None, password=None,
_follow_redirects=True,
headers=None,
extra_form_data=dict(
oauth2_button=self.oauth2_provider)
)
respdata = 'Gravatar image for %s' % profile['email']
self.assertTrue(respdata in res.data.decode('utf8'))
def mock_user_profile_with_additional_claims(self):
profile = {'email': 'oauth2@gmail.com', 'wids': ['789']}
AuthSourceRegistry._registry[OAUTH2].get_user_profile = MagicMock(
return_value=profile)
return profile
def mock_user_profile(self):
profile = {'email': 'oauth2@gmail.com'}