Make --{set,add,del}attr more robust.

This fixes --addattr on single value attributes in add commands and --delattr
on non-unicode attributes in mod commands.

ticket 2954
This commit is contained in:
Jan Cholasta
2012-07-31 06:37:14 -04:00
committed by Martin Kosek
parent c8abd24ebe
commit 72cc54bc27
2 changed files with 131 additions and 43 deletions

View File

@@ -784,36 +784,45 @@ last, after all sets and adds."""),
:param attrs: A list of name/value pair strings, in the "name=value"
format. May also be a single string, or None.
"""
if attrs is None:
return {}
if not isinstance(attrs, (tuple, list)):
attrs = [attrs]
newdict = {}
if attrs is None:
attrs = []
elif not type(attrs) in (list, tuple):
attrs = [attrs]
for a in attrs:
m = re.match("\s*(.*?)\s*=\s*(.*?)\s*$", a)
attr = str(m.group(1)).lower()
value = m.group(2)
m = re.match("^\s*(?P<attr>.*?)\s*=\s*(?P<value>.*?)\s*$", a)
attr = str(m.group('attr').lower())
value = m.group('value')
if attr in self.obj.params and attr not in self.params:
# The attribute is managed by IPA, but it didn't get cloned
# to the command. This happens with no_update/no_create attrs.
raise errors.ValidationError(
name=attr, error=_('attribute is not configurable'))
if len(value) == 0:
# None means "delete this attribute"
value = None
if attr in newdict:
if type(value) in (tuple,):
newdict[attr] += list(value)
else:
newdict[attr].append(value)
else:
if type(value) in (tuple,):
newdict[attr] = list(value)
else:
newdict[attr] = [value]
newdict.setdefault(attr, []).append(value)
return newdict
def _convert_entry(self, entry_attrs):
result = {}
for attr, val in entry_attrs.iteritems():
if val is None:
val = []
elif not isinstance(val, (tuple, list)):
val = [val]
result[attr] = []
for v in val:
if isinstance(v, str):
# This is a Binary value, base64 encode it
v = base64.b64encode(v)
result[attr].append(unicode(v))
return result
def process_attr_options(self, entry_attrs, dn, keys, options):
"""
Process all --setattr, --addattr, and --delattr options and add the
@@ -860,19 +869,20 @@ last, after all sets and adds."""),
direct_del = setattrs & delattrs
needldapattrs = list((addattrs | delattrs) - setattrs)
mod_attrs = self._convert_entry(entry_attrs)
for attr, val in setdict.iteritems():
entry_attrs[attr] = val
mod_attrs[attr] = val
for attr in direct_add:
entry_attrs.setdefault(attr, []).extend(adddict[attr])
mod_attrs.setdefault(attr, []).extend(adddict[attr])
for attr in direct_del:
for delval in deldict[attr]:
try:
entry_attrs[attr].remove(delval)
mod_attrs[attr].remove(delval)
except ValueError:
raise errors.AttrValueNotFound(attr=attr,
value=delval)
raise errors.AttrValueNotFound(attr=attr, value=delval)
if needldapattrs:
try:
@@ -891,28 +901,27 @@ last, after all sets and adds."""),
raise errors.ValidationError(name=del_nonexisting.pop(),
error=_('No such attribute on this entry'))
old_entry = self._convert_entry(old_entry)
for attr in needldapattrs:
entry_attrs[attr] = old_entry.get(attr, [])
mod_attrs[attr] = old_entry.get(attr, [])
if attr in addattrs:
entry_attrs[attr].extend(adddict.get(attr, []))
mod_attrs[attr].extend(adddict.get(attr, []))
for delval in deldict.get(attr, []):
try:
entry_attrs[attr].remove(delval)
mod_attrs[attr].remove(delval)
except ValueError:
if isinstance(delval, str):
# This is a Binary value, base64 encode it
delval = unicode(base64.b64encode(delval))
raise errors.AttrValueNotFound(attr=attr, value=delval)
# normalize all values
changedattrs = setattrs | addattrs | delattrs
for attr in changedattrs:
value = mod_attrs[attr]
if attr in self.params and self.params[attr].attribute:
# convert single-value params to scalars
param = self.params[attr]
value = entry_attrs[attr]
# convert single-value params to scalars
if not param.multivalue:
if len(value) == 1:
value = value[0]
@@ -922,19 +931,19 @@ last, after all sets and adds."""),
raise errors.OnlyOneValueAllowed(attr=attr)
# validate, convert and encode params
try:
value = param(value)
value = param(value)
except errors.ValidationError, err:
raise errors.ValidationError(name=attr, error=err.error)
except errors.ConversionError, err:
raise errors.ConversionError(name=attr, error=err.error)
entry_attrs[attr] = value
else:
# unknown attribute: remove duplicite and invalid values
entry_attrs[attr] = list(set([val for val in entry_attrs[attr] if val]))
if not entry_attrs[attr]:
entry_attrs[attr] = None
elif isinstance(entry_attrs[attr], (tuple, list)) and len(entry_attrs[attr]) == 1:
entry_attrs[attr] = entry_attrs[attr][0]
value = list(set([val for val in value if val]))
if not value:
value = None
elif isinstance(value, (tuple, list)) and len(value) == 1:
value = value[0]
entry_attrs[attr] = value
@classmethod
def register_pre_callback(cls, callback, first=False):

View File

@@ -21,17 +21,29 @@
Test --setattr and --addattr and other attribute-specific issues
"""
from ipalib import api, errors
from ipalib import api, errors, x509
from tests.test_xmlrpc import objectclasses
from xmlrpc_test import Declarative, fuzzy_digits, fuzzy_uuid
from xmlrpc_test import (Declarative, fuzzy_digits, fuzzy_uuid, fuzzy_date,
fuzzy_hex, fuzzy_hash, fuzzy_issuer)
from ipalib.dn import *
import base64
user1=u'tuser1'
user1 = u'tuser1'
fqdn1 = u'testhost1.%s' % api.env.domain
# We can use the same cert we generated for the service tests
fd = open('tests/test_xmlrpc/service.crt', 'r')
servercert = fd.readlines()
servercert = u''.join(servercert)
servercert = x509.strip_header(servercert)
servercert = servercert.replace('\n', '')
fd.close()
class test_attr(Declarative):
cleanup_commands = [
('user_del', [user1], {}),
('host_del', [fqdn1], {}),
]
tests = [
@@ -551,4 +563,71 @@ class test_attr(Declarative):
desc='Server is unwilling to perform', info=''),
),
dict(
desc='Try to create %r with description and --addattr description' % fqdn1,
command=('host_add', [fqdn1],
dict(
description=u'Test host 1',
addattr=u'description=Test host 2',
force=True,
),
),
expected=errors.OnlyOneValueAllowed(attr='description'),
),
dict(
desc='Create %r with a certificate' % fqdn1,
command=('host_add', [fqdn1],
dict(
usercertificate=servercert,
force=True,
),
),
expected=dict(
value=fqdn1,
summary=u'Added host "%s"' % fqdn1,
result=dict(
dn=lambda x: DN(x) == DN(('fqdn',fqdn1),('cn','computers'),
('cn','accounts'),api.env.basedn),
fqdn=[fqdn1],
krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
objectclass=objectclasses.host,
ipauniqueid=[fuzzy_uuid],
managedby_host=[fqdn1],
usercertificate=[base64.b64decode(servercert)],
valid_not_before=fuzzy_date,
valid_not_after=fuzzy_date,
subject=lambda x: DN(x) == \
DN(('CN',api.env.host),x509.subject_base()),
serial_number=fuzzy_digits,
serial_number_hex=fuzzy_hex,
md5_fingerprint=fuzzy_hash,
sha1_fingerprint=fuzzy_hash,
issuer=fuzzy_issuer,
has_keytab=False,
has_password=False,
),
),
),
dict(
desc='Remove %r certificate using --delattr' % fqdn1,
command=('host_mod', [fqdn1],
dict(
delattr=u'usercertificate=%s' % servercert,
),
),
expected=dict(
value=fqdn1,
summary=u'Modified host "%s"' % fqdn1,
result=dict(
fqdn=[fqdn1],
krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)],
managedby_host=[fqdn1],
has_keytab=False,
has_password=False,
),
),
),
]