mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Some plugins, notably dns, modifies a returned entry in order
to compare it to the user-provided info (e.g. dnsrecord-del).
This modification was done on the cached entry directly rather
than a copy which caused unexpected results, mostly
EmptyResult because the cached entry was changed directly so
the next get_entry returned the same modified entry.
In addition, on a hit in the LDAP cache the entire cached entry
was returned regardless of what attributes were requested.
The automember condition add/remove calls only request the
inclusive/exclusive rule attributes and loop over the returned
values to look for duplicates. This was failing because the queried
entry contains attributes that the candidate entry does not contain.
The automember code is:
old_entry = ldap.get_entry(dn, [attr])
for regex in old_entry.keys():
if not isinstance(entry_attrs[regex], (list, tuple)):
old_entry, returned from the cache, contained objectclass, cn,
description, etc. which don't exist in the candidate entry so
entry_attrs[regex] threw a KeyError.
To return a copy of the entry and requested attributes on a
search HIT.
Also be more careful when storing the attributes in the cache entry.
The returned attributes may not match the requested. So store the
attributes we actually have.
This issue was exposed by Ansible which maintains a larger and
longer-lived cache because commands are executed in the server context
one after another, giving the cache a chance to build up.
Adjust the expected test results as well. In test_get_testuser()
the first request asks for all attributes (default) so ensure
that is successful since a user_add gets all attributes in
the post_callback. Next request a subset of the attributes which
is also a hit and confirm that only those requested were returned.
https://pagure.io/freeipa/issue/8897
Signed-off-by: Rob Crittenden <rcritten@redhat.com>
Reviewed-By: Stanislav Levin <slev@altlinux.org>
Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com>
146 lines
4.1 KiB
Python
146 lines
4.1 KiB
Python
#
|
|
# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
|
|
#
|
|
|
|
"""
|
|
Test the LDAPCache class.
|
|
"""
|
|
# pylint: disable=no-member
|
|
|
|
from ipalib import api, errors
|
|
from ipapython import ipaldap
|
|
from ipapython.dn import DN
|
|
|
|
import pytest
|
|
|
|
|
|
def hits_and_misses(cache, hits, misses):
|
|
assert cache._cache_hits == hits
|
|
assert cache._cache_misses == misses
|
|
|
|
|
|
@pytest.fixture(scope='class')
|
|
@pytest.mark.tier1
|
|
@pytest.mark.needs_ipaapi
|
|
def class_cache(request):
|
|
cache = ipaldap.LDAPCache(api.env.ldap_uri)
|
|
hits_and_misses(cache, 0, 0)
|
|
|
|
request.cls.cache = cache
|
|
request.cls.userdn = DN(
|
|
'uid=testuser', api.env.container_user, api.env.basedn
|
|
)
|
|
|
|
rpcclient = api.Backend.rpcclient
|
|
was_connected = rpcclient.isconnected()
|
|
|
|
if not was_connected:
|
|
rpcclient.connect()
|
|
|
|
api.Command.user_add('testuser', givenname=u'Test', sn=u'User')
|
|
|
|
yield
|
|
|
|
try:
|
|
api.Command.user_del('testuser')
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if not was_connected:
|
|
rpcclient.disconnect()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@pytest.mark.usefixtures('class_cache')
|
|
@pytest.mark.skip_ipaclient_unittest
|
|
@pytest.mark.needs_ipaapi
|
|
class TestLDAPCache:
|
|
|
|
def test_one(self):
|
|
dn = DN('uid=notfound', api.env.container_user, api.env.basedn)
|
|
try:
|
|
self.cache.get_entry(dn)
|
|
except errors.EmptyResult:
|
|
pass
|
|
|
|
assert dn in self.cache.cache
|
|
exc = self.cache.cache[dn].exception
|
|
assert isinstance(exc, errors.EmptyResult)
|
|
|
|
hits_and_misses(self.cache, 0, 1)
|
|
|
|
def test_retrieve_exception(self):
|
|
dn = DN('uid=notfound', api.env.container_user, api.env.basedn)
|
|
try:
|
|
self.cache.get_entry(dn)
|
|
except errors.EmptyResult:
|
|
pass
|
|
assert dn in self.cache.cache
|
|
exc = self.cache.cache[dn].exception
|
|
assert isinstance(exc, errors.EmptyResult)
|
|
hits_and_misses(self.cache, 1, 1)
|
|
|
|
def test_get_testuser(self):
|
|
assert self.userdn not in self.cache.cache
|
|
self.cache.get_entry(self.userdn)
|
|
assert self.userdn in self.cache.cache
|
|
hits_and_misses(self.cache, 1, 2)
|
|
|
|
def test_get_testuser_again(self):
|
|
assert self.userdn in self.cache.cache
|
|
|
|
# get the user again with with no attributes requested (so all)
|
|
self.cache.get_entry(self.userdn)
|
|
hits_and_misses(self.cache, 2, 2)
|
|
|
|
# Now get the user with a subset of cached attributes
|
|
entry = self.cache.get_entry(self.userdn, ('givenname', 'sn', 'cn'))
|
|
# Make sure we only got three attributes, as requested
|
|
assert len(entry.items()) == 3
|
|
hits_and_misses(self.cache, 3, 2)
|
|
|
|
def test_update_testuser(self):
|
|
entry = self.cache.cache[self.userdn].entry
|
|
try:
|
|
self.cache.update_entry(entry)
|
|
except errors.EmptyModlist:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 2)
|
|
|
|
def test_modify_testuser(self):
|
|
self.cache.get_entry(self.userdn)
|
|
entry = self.cache.cache[self.userdn].entry
|
|
try:
|
|
self.cache.modify_s(entry.dn, [])
|
|
except errors.EmptyModlist:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 3)
|
|
|
|
def test_delete_entry(self):
|
|
# We don't care if this is successful or not, just that the
|
|
# cache doesn't retain the deleted entry
|
|
try:
|
|
self.cache.delete_entry(self.userdn)
|
|
except Exception:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 3)
|
|
|
|
def test_add_entry(self):
|
|
# We don't care if this is successful or not, just that the
|
|
# cache doesn't get the added entry
|
|
try:
|
|
self.cache.add_entry(self.userdn)
|
|
except Exception:
|
|
pass
|
|
assert self.userdn not in self.cache.cache
|
|
hits_and_misses(self.cache, 3, 3)
|
|
|
|
def test_clear_cache(self):
|
|
self.cache.clear_cache()
|
|
hits_and_misses(self.cache, 0, 0)
|