mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Fix attempted write to attribute of read-only object.
Add new class "cachedproperty" for creating property-like attributes that cache the return value of a method call. Also fix few issues in the unit tests to enable them to succeed. ticket 1959
This commit is contained in:
committed by
Alexander Bokovoy
parent
46d3abc450
commit
9beb467d98
@@ -1092,7 +1092,7 @@ class DN(object):
|
||||
return rdns
|
||||
elif isinstance(value, (tuple, list)):
|
||||
if len(value) != 2:
|
||||
raise ValueError("tuple or list must be 2-valued, not \"%s\"" % (rdn))
|
||||
raise ValueError("tuple or list must be 2-valued, not \"%s\"" % (value))
|
||||
rdn = RDN(value, first_key_match=self.first_key_match)
|
||||
return rdn
|
||||
else:
|
||||
|
@@ -448,10 +448,10 @@ class RefererError(PublicError):
|
||||
|
||||
For example:
|
||||
|
||||
>>> raise RefererError()
|
||||
>>> raise RefererError(referer='referer')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
RefererError: Missing or invalid HTTP Referer
|
||||
RefererError: Missing or invalid HTTP Referer, referer
|
||||
"""
|
||||
|
||||
errno = 911
|
||||
@@ -1537,7 +1537,7 @@ class DependentEntry(ExecutionError):
|
||||
>>> raise DependentEntry(label=u'SELinux User Map', key=u'test', dependent=u'test1')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
DependentEntry: Not registered yet
|
||||
DependentEntry: test cannot be deleted because SELinux User Map test1 requires it
|
||||
|
||||
"""
|
||||
|
||||
|
@@ -27,6 +27,7 @@ import time
|
||||
import socket
|
||||
import re
|
||||
from types import NoneType
|
||||
from weakref import WeakKeyDictionary
|
||||
|
||||
from ipalib import errors
|
||||
from ipalib.text import _
|
||||
@@ -272,3 +273,36 @@ def validate_hostname(hostname):
|
||||
if not all(regex_name.match(part) for part in hostname.split(".")):
|
||||
raise ValueError(_('hostname parts may only include letters, numbers, and - ' \
|
||||
'(which is not allowed as the last character)'))
|
||||
|
||||
class cachedproperty(object):
|
||||
"""
|
||||
A property-like attribute that caches the return value of a method call.
|
||||
|
||||
When the attribute is first read, the method is called and its return
|
||||
value is saved and returned. On subsequent reads, the saved value is
|
||||
returned.
|
||||
|
||||
Typical usage:
|
||||
class C(object):
|
||||
@cachedproperty
|
||||
def attr(self):
|
||||
return 'value'
|
||||
"""
|
||||
__slots__ = ('getter', 'store')
|
||||
|
||||
def __init__(self, getter):
|
||||
self.getter = getter
|
||||
self.store = WeakKeyDictionary()
|
||||
|
||||
def __get__(self, obj, cls):
|
||||
if obj is None:
|
||||
return None
|
||||
if obj not in self.store:
|
||||
self.store[obj] = self.getter(obj)
|
||||
return self.store[obj]
|
||||
|
||||
def __set__(self, obj, value):
|
||||
raise AttributeError("can't set attribute")
|
||||
|
||||
def __delete__(self, obj):
|
||||
raise AttributeError("can't delete attribute")
|
||||
|
@@ -700,7 +700,7 @@ class LDAPUpdate:
|
||||
self.conn.deleteEntry(dn)
|
||||
self.modified = True
|
||||
except errors.NotFound, e:
|
||||
root_logger.info("%s did not exist:%s", (dn, e))
|
||||
root_logger.info("%s did not exist:%s", dn, e)
|
||||
self.modified = True
|
||||
except errors.DatabaseError, e:
|
||||
root_logger.error("Delete failed: %s", e)
|
||||
@@ -717,7 +717,7 @@ class LDAPUpdate:
|
||||
self.conn.deleteEntry(dn)
|
||||
self.modified = True
|
||||
except errors.NotFound, e:
|
||||
root_logger.info("%s did not exist:%s", (dn, e))
|
||||
root_logger.info("%s did not exist:%s", dn, e)
|
||||
self.modified = True
|
||||
except errors.DatabaseError, e:
|
||||
root_logger.error("Delete failed: %s", e)
|
||||
|
@@ -314,7 +314,7 @@ class IPAdmin(IPAEntryLDAPObject):
|
||||
raise errors.DuplicateEntry()
|
||||
except ldap.CONSTRAINT_VIOLATION, e:
|
||||
# This error gets thrown by the uniqueness plugin
|
||||
if info == 'Another entry with the same attribute value already exists':
|
||||
if info.startswith('Another entry with the same attribute value already exists'):
|
||||
raise errors.DuplicateEntry()
|
||||
else:
|
||||
raise errors.DatabaseError(desc=desc,info=info)
|
||||
|
@@ -1200,6 +1200,7 @@ import os, random, ldap
|
||||
from ipaserver.plugins import rabase
|
||||
from ipalib.errors import NetworkError, CertificateOperationError
|
||||
from ipalib.constants import TYPE_ERROR
|
||||
from ipalib.util import cachedproperty
|
||||
from ipapython import dogtag
|
||||
from ipalib import _
|
||||
|
||||
@@ -1218,7 +1219,6 @@ class ra(rabase.rabase):
|
||||
self.ipa_key_size = "2048"
|
||||
self.ipa_certificate_nickname = "ipaCert"
|
||||
self.ca_certificate_nickname = "caCert"
|
||||
self.ca_host = None
|
||||
try:
|
||||
f = open(self.pwd_file, "r")
|
||||
self.password = f.readline().strip()
|
||||
@@ -1266,7 +1266,8 @@ class ra(rabase.rabase):
|
||||
pass
|
||||
return None
|
||||
|
||||
def _select_ca(self):
|
||||
@cachedproperty
|
||||
def ca_host(self):
|
||||
"""
|
||||
:return: host
|
||||
as str
|
||||
@@ -1293,8 +1294,6 @@ class ra(rabase.rabase):
|
||||
|
||||
Perform an HTTP request.
|
||||
"""
|
||||
if self.ca_host == None:
|
||||
self.ca_host = self._select_ca()
|
||||
return dogtag.http_request(self.ca_host, port, url, **kw)
|
||||
|
||||
def _sslget(self, url, port, **kw):
|
||||
@@ -1306,9 +1305,6 @@ class ra(rabase.rabase):
|
||||
|
||||
Perform an HTTPS request
|
||||
"""
|
||||
|
||||
if self.ca_host == None:
|
||||
self.ca_host = self._select_ca()
|
||||
return dogtag.https_request(self.ca_host, port, url, self.sec_dir, self.password, self.ipa_certificate_nickname, **kw)
|
||||
|
||||
def get_parse_result_xml(self, xml_text, parse_func):
|
||||
|
@@ -210,7 +210,7 @@ def _handle_errors(e, **kw):
|
||||
raise errors.DuplicateEntry()
|
||||
except _ldap.CONSTRAINT_VIOLATION:
|
||||
# This error gets thrown by the uniqueness plugin
|
||||
if info == 'Another entry with the same attribute value already exists':
|
||||
if info.startswith('Another entry with the same attribute value already exists'):
|
||||
raise errors.DuplicateEntry()
|
||||
else:
|
||||
raise errors.DatabaseError(desc=desc, info=info)
|
||||
|
@@ -247,7 +247,7 @@ class test_Plugin(ClassChecker):
|
||||
info = 'whatever'
|
||||
e = raises(StandardError, check)
|
||||
assert str(e) == \
|
||||
"check.info attribute ('whatever') conflicts with Plugin logger"
|
||||
"info is already bound to tests.test_ipalib.test_plugable.check()"
|
||||
|
||||
def test_set_api(self):
|
||||
"""
|
||||
|
@@ -112,7 +112,15 @@ class test_ldap(object):
|
||||
myapi.register(service)
|
||||
myapi.register(service_show)
|
||||
myapi.finalize()
|
||||
myapi.Backend.ldap2.connect(bind_dn="cn=Directory Manager", bind_pw='password')
|
||||
|
||||
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
|
||||
if ipautil.file_exists(pwfile):
|
||||
fp = open(pwfile, "r")
|
||||
dm_password = fp.read().rstrip()
|
||||
fp.close()
|
||||
else:
|
||||
raise nose.SkipTest("No directory manager password in %s" % pwfile)
|
||||
myapi.Backend.ldap2.connect(bind_dn="cn=Directory Manager", bind_pw=dm_password)
|
||||
|
||||
result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,))
|
||||
entry_attrs = result['result']
|
||||
|
Reference in New Issue
Block a user