ipaldap: allow GetEffectiveRights on individual operations

Allow caller to specify that the GetEffectiveRights server control
should be used on a per-operation basis.  Also update
ldap2.get_effective_rights to use this new API.

Part of: https://pagure.io/freeipa/issue/6609

Reviewed-By: Christian Heimes <cheimes@redhat.com>
This commit is contained in:
Fraser Tweedale
2018-02-08 11:04:48 +11:00
committed by Christian Heimes
parent ece17cef17
commit 4daac52ddd
2 changed files with 36 additions and 33 deletions

View File

@@ -40,7 +40,7 @@ from cryptography import x509 as crypto_x509
import ldap
import ldap.sasl
import ldap.filter
from ldap.controls import SimplePagedResultsControl
from ldap.controls import SimplePagedResultsControl, GetEffectiveRightsControl
import six
# pylint: disable=ipa-forbidden-import
@@ -1318,7 +1318,7 @@ class LDAPClient(object):
return cls.combine_filters(flts, rules)
def get_entries(self, base_dn, scope=ldap.SCOPE_SUBTREE, filter=None,
attrs_list=None, **kwargs):
attrs_list=None, get_effective_rights=False, **kwargs):
"""Return a list of matching entries.
:raises: errors.LimitsExceeded if the list is truncated by the server
@@ -1329,11 +1329,13 @@ class LDAPClient(object):
:param scope: search scope, see LDAP docs (default ldap2.SCOPE_SUBTREE)
:param filter: LDAP filter to apply
:param attrs_list: ist of attributes to return, all if None (default)
:param get_effective_rights: use GetEffectiveRights control
:param kwargs: additional keyword arguments. See find_entries method
for their description.
"""
entries, truncated = self.find_entries(
base_dn=base_dn, scope=scope, filter=filter, attrs_list=attrs_list,
get_effective_rights=get_effective_rights,
**kwargs)
try:
self.handle_truncated_result(truncated)
@@ -1346,9 +1348,10 @@ class LDAPClient(object):
return entries
def find_entries(self, filter=None, attrs_list=None, base_dn=None,
scope=ldap.SCOPE_SUBTREE, time_limit=None,
size_limit=None, paged_search=False):
def find_entries(
self, filter=None, attrs_list=None, base_dn=None,
scope=ldap.SCOPE_SUBTREE, time_limit=None, size_limit=None,
paged_search=False, get_effective_rights=False):
"""
Return a list of entries and indication of whether the results were
truncated ([(dn, entry_attrs)], truncated) matching specified search
@@ -1356,13 +1359,16 @@ class LDAPClient(object):
search hit a server limit and its results are incomplete.
Keyword arguments:
attrs_list -- list of attributes to return, all if None (default None)
base_dn -- dn of the entry at which to start the search (default '')
scope -- search scope, see LDAP docs (default ldap2.SCOPE_SUBTREE)
time_limit -- time limit in seconds (default unlimited)
size_limit -- size (number of entries returned) limit
:param attrs_list: list of attributes to return, all if None
(default None)
:param base_dn: dn of the entry at which to start the search
(default '')
:param scope: search scope, see LDAP docs (default ldap2.SCOPE_SUBTREE)
:param time_limit: time limit in seconds (default unlimited)
:param size_limit: size (number of entries returned) limit
(default unlimited)
paged_search -- search using paged results control
:param paged_search: search using paged results control
:param get_effective_rights: use GetEffectiveRights control
:raises: errors.NotFound if result set is empty
or base_dn doesn't exist
@@ -1391,7 +1397,10 @@ class LDAPClient(object):
if attrs_list:
attrs_list = [a.lower() for a in set(attrs_list)]
sctrls = None
base_sctrls = []
if get_effective_rights:
base_sctrls.append(self.__get_effective_rights_control())
cookie = ''
page_size = (size_limit if size_limit > 0 else 2000) - 1
if page_size == 0:
@@ -1405,7 +1414,11 @@ class LDAPClient(object):
while True:
if paged_search:
sctrls = [SimplePagedResultsControl(0, page_size, cookie)]
sctrls = base_sctrls + [
SimplePagedResultsControl(0, page_size, cookie)
]
else:
sctrls = base_sctrls or None
try:
id = self.conn.search_ext(
@@ -1468,6 +1481,12 @@ class LDAPClient(object):
return (res, truncated)
def __get_effective_rights_control(self):
"""Construct a GetEffectiveRights control for current user."""
bind_dn = self.conn.whoami_s()[4:]
return GetEffectiveRightsControl(
True, "dn: {0}".format(bind_dn).encode('utf-8'))
def find_entry_by_attr(self, attr, value, object_class, attrs_list=None,
base_dn=None):
"""
@@ -1493,7 +1512,7 @@ class LDAPClient(object):
return entries[0]
def get_entry(self, dn, attrs_list=None, time_limit=None,
size_limit=None):
size_limit=None, get_effective_rights=False):
"""
Get entry (dn, entry_attrs) by dn.
@@ -1505,7 +1524,7 @@ class LDAPClient(object):
entries = self.get_entries(
dn, self.SCOPE_BASE, None, attrs_list, time_limit=time_limit,
size_limit=size_limit
size_limit=size_limit, get_effective_rights=get_effective_rights,
)
return entries[0]

View File

@@ -38,8 +38,6 @@ from ipapython.dn import DN
from ipapython.ipaldap import (LDAPClient, AUTOBIND_AUTO, AUTOBIND_ENABLED,
AUTOBIND_DISABLED)
from ldap.controls.simple import GetEffectiveRightsControl
from ipalib import Registry, errors, _
from ipalib.crud import CrudBackend
from ipalib.request import context
@@ -274,22 +272,8 @@ class ldap2(CrudBackend, LDAPClient):
Returns 2 attributes, the attributeLevelRights for the given list of
attributes and the entryLevelRights for the entry itself.
"""
assert isinstance(dn, DN)
bind_dn = self.conn.whoami_s()[4:]
sctrl = [
GetEffectiveRightsControl(
True, "dn: {0}".format(bind_dn).encode('utf-8'))
]
self.conn.set_option(_ldap.OPT_SERVER_CONTROLS, sctrl)
try:
entry = self.get_entry(dn, attrs_list)
finally:
# remove the control so subsequent operations don't include GER
self.conn.set_option(_ldap.OPT_SERVER_CONTROLS, [])
return entry
return self.get_entry(dn, attrs_list, get_effective_rights=True)
def can_write(self, dn, attr):
"""Returns True/False if the currently bound user has write permissions