From 66154f8bf79584b8fa6792e3d2ca534900dfa481 Mon Sep 17 00:00:00 2001 From: Florence Blanc-Renaud Date: Wed, 26 Feb 2020 15:00:11 +0100 Subject: [PATCH] Privilege: add a helper checking if a principal has a given privilege server_conncheck is ensuring that the caller has the expected privilege. Move the code to a common place in ipaserver/plugins/privilege.py Related: https://pagure.io/freeipa/issue/7600 Reviewed-By: Alexander Bokovoy Reviewed-By: Rob Crittenden Reviewed-By: Sergey Orlov Reviewed-By: Alexander Bokovoy Reviewed-By: Rob Crittenden Reviewed-By: Sergey Orlov --- ipaserver/plugins/privilege.py | 14 ++++++++++++++ ipaserver/plugins/server.py | 11 ++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/ipaserver/plugins/privilege.py b/ipaserver/plugins/privilege.py index 2c4d69aa2..22786e855 100644 --- a/ipaserver/plugins/privilege.py +++ b/ipaserver/plugins/privilege.py @@ -83,6 +83,20 @@ def validate_permission_to_privilege(api, permission): 'ipapermbindruletype', 'permission')}) +def principal_has_privilege(api, principal, privilege): + privilege_dn = api.Object.privilege.get_dn(privilege) + ldap = api.Backend.ldap2 + filter = ldap.make_filter({ + 'krbprincipalname': principal, # pylint: disable=no-member + 'memberof': privilege_dn}, + rules=ldap.MATCH_ALL) + try: + ldap.find_entries(base_dn=api.env.basedn, filter=filter) + except errors.NotFound: + return False + return True + + @register() class privilege(LDAPObject): """ diff --git a/ipaserver/plugins/server.py b/ipaserver/plugins/server.py index 3c0038d0c..f2544a8bc 100644 --- a/ipaserver/plugins/server.py +++ b/ipaserver/plugins/server.py @@ -31,6 +31,7 @@ from ipaserver import topology from ipaserver.servroles import ENABLED, HIDDEN from ipaserver.install import bindinstance, dnskeysyncinstance from ipaserver.install.service import hide_services, enable_services +from ipaserver.plugins.privilege import principal_has_privilege __doc__ = _(""" IPA servers @@ -920,15 +921,7 @@ class server_conncheck(crud.PKQuery): # the user must have the Replication Administrators privilege privilege = u'Replication Administrators' - privilege_dn = self.api.Object.privilege.get_dn(privilege) - ldap = self.obj.backend - filter = ldap.make_filter({ - 'krbprincipalname': context.principal, # pylint: disable=no-member - 'memberof': privilege_dn}, - rules=ldap.MATCH_ALL) - try: - ldap.find_entries(base_dn=self.api.env.basedn, filter=filter) - except errors.NotFound: + if not principal_has_privilege(self.api, context.principal, privilege): raise errors.ACIError( info=_("not allowed to perform server connection check"))