mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Preserve case of attribute names in LDAPEntry.
This commit is contained in:
committed by
Martin Kosek
parent
bb36683c84
commit
8f46ca5dd2
@@ -230,7 +230,10 @@ def entry_from_entry(entry, newentry):
|
||||
entry[e] = newentry[e]
|
||||
|
||||
def entry_to_dict(entry, **options):
|
||||
result = dict(entry)
|
||||
if options.get('raw', False):
|
||||
result = dict(entry)
|
||||
else:
|
||||
result = dict((k.lower(), v) for (k, v) in entry.iteritems())
|
||||
if options.get('all', False):
|
||||
result['dn'] = entry.dn
|
||||
return result
|
||||
|
||||
@@ -346,11 +346,11 @@ class cert_request(VirtualCommand):
|
||||
# going to add it
|
||||
try:
|
||||
if not principal.startswith('host/'):
|
||||
service = api.Command['service_show'](principal, all=True, raw=True)['result']
|
||||
service = api.Command['service_show'](principal, all=True)['result']
|
||||
dn = service['dn']
|
||||
else:
|
||||
hostname = get_host_from_principal(principal)
|
||||
service = api.Command['host_show'](hostname, all=True, raw=True)['result']
|
||||
service = api.Command['host_show'](hostname, all=True)['result']
|
||||
dn = service['dn']
|
||||
except errors.NotFound, e:
|
||||
if not add:
|
||||
@@ -375,7 +375,7 @@ class cert_request(VirtualCommand):
|
||||
for name in subjectaltname:
|
||||
name = unicode(name)
|
||||
try:
|
||||
hostentry = api.Command['host_show'](name, all=True, raw=True)['result']
|
||||
hostentry = api.Command['host_show'](name, all=True)['result']
|
||||
hostdn = hostentry['dn']
|
||||
except errors.NotFound:
|
||||
# We don't want to issue any certificates referencing
|
||||
@@ -385,7 +385,7 @@ class cert_request(VirtualCommand):
|
||||
'subject alt name %s in certificate request') % name)
|
||||
authprincipal = getattr(context, 'principal')
|
||||
if authprincipal.startswith("host/"):
|
||||
if not hostdn in service.get('managedby', []):
|
||||
if not hostdn in service.get('managedby_host', []):
|
||||
raise errors.ACIError(info=_(
|
||||
"Insufficient privilege to create a certificate "
|
||||
"with subject alt name '%s'.") % name)
|
||||
|
||||
@@ -305,12 +305,12 @@ class pwpolicy(LDAPObject):
|
||||
existing_entry = {}
|
||||
if not add: # then read existing entry
|
||||
existing_entry = self.api.Command.pwpolicy_show(keys[-1],
|
||||
all=True, raw=True,
|
||||
all=True,
|
||||
)['result']
|
||||
if minlife is None and 'krbminpwdlife' in existing_entry:
|
||||
minlife = int(existing_entry['krbminpwdlife'][0])
|
||||
minlife = int(existing_entry['krbminpwdlife'][0]) * 3600
|
||||
if maxlife is None and 'krbmaxpwdlife' in existing_entry:
|
||||
maxlife = int(existing_entry['krbmaxpwdlife'][0])
|
||||
maxlife = int(existing_entry['krbmaxpwdlife'][0]) * 86400
|
||||
|
||||
if maxlife is not None and minlife is not None:
|
||||
if minlife > maxlife:
|
||||
|
||||
@@ -583,7 +583,7 @@ class IPASimpleLDAPObject(object):
|
||||
# r[0] == r.dn
|
||||
# r[1] == r.data
|
||||
class LDAPEntry(dict):
|
||||
__slots__ = ('_dn', '_orig')
|
||||
__slots__ = ('_dn', '_names', '_orig')
|
||||
|
||||
def __init__(self, _dn=None, _obj=None, **kwargs):
|
||||
super(LDAPEntry, self).__init__()
|
||||
@@ -604,6 +604,7 @@ class LDAPEntry(dict):
|
||||
|
||||
self._dn = _dn
|
||||
self._orig = orig
|
||||
self._names = CIDict()
|
||||
|
||||
if orig is None:
|
||||
self.commit()
|
||||
@@ -630,23 +631,9 @@ class LDAPEntry(dict):
|
||||
# FIXME: for backwards compatibility only
|
||||
return self._orig
|
||||
|
||||
def _attr_name(self, name):
|
||||
if not isinstance(name, basestring):
|
||||
raise TypeError(
|
||||
"attribute name must be unicode or str, got %s object %r" % (
|
||||
name.__class__.__name__, name))
|
||||
if isinstance(name, str):
|
||||
name = name.decode('ascii')
|
||||
return name.lower()
|
||||
|
||||
def _init_iter(self, _obj, **kwargs):
|
||||
_obj = dict(_obj, **kwargs)
|
||||
for (k, v) in _obj.iteritems():
|
||||
yield (self._attr_name(k), v)
|
||||
|
||||
def __repr__(self):
|
||||
dict_repr = super(LDAPEntry, self).__repr__()
|
||||
return '%s(%s, %s)' % (type(self).__name__, repr(self._dn), dict_repr)
|
||||
return '%s(%r, %s)' % (type(self).__name__, self._dn,
|
||||
super(LDAPEntry, self).__repr__())
|
||||
|
||||
def copy(self):
|
||||
return LDAPEntry(self)
|
||||
@@ -655,14 +642,48 @@ class LDAPEntry(dict):
|
||||
self._orig = self
|
||||
self._orig = deepcopy(self)
|
||||
|
||||
def _attr_name(self, name):
|
||||
if not isinstance(name, basestring):
|
||||
raise TypeError(
|
||||
"attribute name must be unicode or str, got %s object %r" % (
|
||||
name.__class__.__name__, name))
|
||||
|
||||
if isinstance(name, str):
|
||||
name = name.decode('utf-8')
|
||||
|
||||
return name
|
||||
|
||||
def __setitem__(self, name, value):
|
||||
super(LDAPEntry, self).__setitem__(self._attr_name(name), value)
|
||||
name = self._attr_name(name)
|
||||
|
||||
if self._names.has_key(name):
|
||||
oldname = self._names[name]
|
||||
|
||||
if oldname != name:
|
||||
for (altname, keyname) in self._names.iteritems():
|
||||
if keyname == oldname:
|
||||
self._names[altname] = name
|
||||
|
||||
super(LDAPEntry, self).__delitem__(oldname)
|
||||
else:
|
||||
self._names[name] = name
|
||||
|
||||
super(LDAPEntry, self).__setitem__(name, value)
|
||||
|
||||
def setdefault(self, name, default):
|
||||
return super(LDAPEntry, self).setdefault(self._attr_name(name), default)
|
||||
if name not in self:
|
||||
self[name] = default
|
||||
return self[name]
|
||||
|
||||
def update(self, _obj={}, **kwargs):
|
||||
super(LDAPEntry, self).update(self._init_iter(_obj, **kwargs))
|
||||
_obj = dict(_obj, **kwargs)
|
||||
for (name, value) in _obj.iteritems():
|
||||
self[name] = value
|
||||
|
||||
def _get_attr_name(self, name):
|
||||
name = self._attr_name(name)
|
||||
name = self._names[name]
|
||||
return name
|
||||
|
||||
def __getitem__(self, name):
|
||||
# for python-ldap tuple compatibility
|
||||
@@ -671,10 +692,15 @@ class LDAPEntry(dict):
|
||||
elif name == 1:
|
||||
return self
|
||||
|
||||
return super(LDAPEntry, self).__getitem__(self._attr_name(name))
|
||||
return super(LDAPEntry, self).__getitem__(self._get_attr_name(name))
|
||||
|
||||
def get(self, name, default=None):
|
||||
return super(LDAPEntry, self).get(self._attr_name(name), default)
|
||||
try:
|
||||
name = self._get_attr_name(name)
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
return super(LDAPEntry, self).get(name, default)
|
||||
|
||||
def single_value(self, name, default=_missing):
|
||||
"""Return a single attribute value
|
||||
@@ -685,7 +711,8 @@ class LDAPEntry(dict):
|
||||
If the entry is missing and default is not given, raise KeyError.
|
||||
"""
|
||||
try:
|
||||
values = super(LDAPEntry, self).__getitem__(self._attr_name(name))
|
||||
attr_name = self._get_attr_name(name)
|
||||
values = super(LDAPEntry, self).__getitem__(attr_name)
|
||||
except KeyError:
|
||||
if default is _missing:
|
||||
raise
|
||||
@@ -697,17 +724,41 @@ class LDAPEntry(dict):
|
||||
'%s has %s values, one expected' % (name, len(values)))
|
||||
return values[0]
|
||||
|
||||
def _del_attr_name(self, name):
|
||||
name = self._get_attr_name(name)
|
||||
|
||||
for (altname, keyname) in self._names.items():
|
||||
if keyname == name:
|
||||
del self._names[altname]
|
||||
|
||||
return name
|
||||
|
||||
def __delitem__(self, name):
|
||||
super(LDAPEntry, self).__delitem__(self._attr_name(name))
|
||||
super(LDAPEntry, self).__delitem__(self._del_attr_name(name))
|
||||
|
||||
def pop(self, name, *default):
|
||||
return super(LDAPEntry, self).pop(self._attr_name(name), *default)
|
||||
try:
|
||||
name = self._del_attr_name(name)
|
||||
except KeyError:
|
||||
if not default:
|
||||
raise
|
||||
|
||||
return super(LDAPEntry, self).pop(name, *default)
|
||||
|
||||
def popitem(self):
|
||||
name, value = super(LDAPEntry, self).popitem()
|
||||
self._del_attr_name(name)
|
||||
return (name, value)
|
||||
|
||||
def clear(self):
|
||||
super(LDAPEntry, self).clear()
|
||||
self._names.clear()
|
||||
|
||||
def __contains__(self, name):
|
||||
return super(LDAPEntry, self).__contains__(self._attr_name(name))
|
||||
return self._names.has_key(self._attr_name(name))
|
||||
|
||||
def has_key(self, name):
|
||||
return super(LDAPEntry, self).has_key(self._attr_name(name))
|
||||
return name in self
|
||||
|
||||
# for python-ldap tuple compatibility
|
||||
def __iter__(self):
|
||||
|
||||
@@ -158,18 +158,26 @@ class test_ldap(object):
|
||||
|
||||
e = LDAPEntry(dn1, cn=cn1)
|
||||
assert e.dn is dn1
|
||||
assert u'cn' in e
|
||||
assert u'cn' in e.keys()
|
||||
assert 'CN' in e
|
||||
assert 'CN' not in e.keys()
|
||||
assert e['CN'] is cn1
|
||||
assert e['CN'] is e[u'cn']
|
||||
|
||||
e.dn = dn2
|
||||
assert e.dn is dn2
|
||||
|
||||
e['cn'] = cn2
|
||||
e['CN'] = cn2
|
||||
assert u'cn' in e
|
||||
assert u'cn' not in e.keys()
|
||||
assert 'CN' in e
|
||||
assert 'CN' in e.keys()
|
||||
assert e['CN'] is cn2
|
||||
assert e['CN'] is e[u'cn']
|
||||
|
||||
del e['CN']
|
||||
assert 'CN' not in e
|
||||
assert 'CN' not in e.keys()
|
||||
assert u'cn' not in e
|
||||
assert u'cn' not in e.keys()
|
||||
|
||||
Reference in New Issue
Block a user