mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-26 16:16:31 -06:00
Privilege: add a helper checking if a principal has a given privilege
server_conncheck is ensuring that the caller has the expected privilege. Move the code to a common place in ipaserver/plugins/privilege.py Related: https://pagure.io/freeipa/issue/7600 Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com> Reviewed-By: Sergey Orlov <sorlov@redhat.com>
This commit is contained in:
parent
473f9baf26
commit
68c72e344a
@ -83,6 +83,20 @@ def validate_permission_to_privilege(api, permission):
|
||||
'ipapermbindruletype', 'permission')})
|
||||
|
||||
|
||||
def principal_has_privilege(api, principal, privilege):
|
||||
privilege_dn = api.Object.privilege.get_dn(privilege)
|
||||
ldap = api.Backend.ldap2
|
||||
filter = ldap.make_filter({
|
||||
'krbprincipalname': principal, # pylint: disable=no-member
|
||||
'memberof': privilege_dn},
|
||||
rules=ldap.MATCH_ALL)
|
||||
try:
|
||||
ldap.find_entries(base_dn=api.env.basedn, filter=filter)
|
||||
except errors.NotFound:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@register()
|
||||
class privilege(LDAPObject):
|
||||
"""
|
||||
|
@ -31,6 +31,7 @@ from ipaserver import topology
|
||||
from ipaserver.servroles import ENABLED, HIDDEN
|
||||
from ipaserver.install import bindinstance, dnskeysyncinstance
|
||||
from ipaserver.install.service import hide_services, enable_services
|
||||
from ipaserver.plugins.privilege import principal_has_privilege
|
||||
|
||||
__doc__ = _("""
|
||||
IPA servers
|
||||
@ -920,15 +921,7 @@ class server_conncheck(crud.PKQuery):
|
||||
|
||||
# the user must have the Replication Administrators privilege
|
||||
privilege = u'Replication Administrators'
|
||||
privilege_dn = self.api.Object.privilege.get_dn(privilege)
|
||||
ldap = self.obj.backend
|
||||
filter = ldap.make_filter({
|
||||
'krbprincipalname': context.principal, # pylint: disable=no-member
|
||||
'memberof': privilege_dn},
|
||||
rules=ldap.MATCH_ALL)
|
||||
try:
|
||||
ldap.find_entries(base_dn=self.api.env.basedn, filter=filter)
|
||||
except errors.NotFound:
|
||||
if not principal_has_privilege(self.api, context.principal, privilege):
|
||||
raise errors.ACIError(
|
||||
info=_("not allowed to perform server connection check"))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user