mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-24 16:10:02 -06:00
Only split CSV in the client, quote instead of escaping
Splitting on commas is not an idempotent operation: 'a,b\,c' -> ('a', 'b,c') -> ('a', 'b', 'c') That means we can't do it when the call is forwarded, so this is only done on the CLI. The UI already sends values as a tuple. Replace escaping in the csv parser with quoting. Quoted strings can have embedded commas instead of having to escape them. This prevents the csv parser from eating all escape characters. Also, document Param's csv arguments, and update tests. https://fedorahosted.org/freeipa/ticket/2417 https://fedorahosted.org/freeipa/ticket/2227
This commit is contained in:
parent
8f71f42ef7
commit
dddebe2350
@ -1025,9 +1025,10 @@ class cli(backend.Executioner):
|
||||
if not isinstance(cmd, frontend.Local):
|
||||
self.create_context()
|
||||
kw = self.parse(cmd, argv)
|
||||
kw['version'] = API_VERSION
|
||||
if self.env.interactive:
|
||||
self.prompt_interactively(cmd, kw)
|
||||
kw = cmd.split_csv(**kw)
|
||||
kw['version'] = API_VERSION
|
||||
self.load_files(cmd, kw)
|
||||
try:
|
||||
result = self.execute(name, **kw)
|
||||
|
@ -557,6 +557,26 @@ class Command(HasParam):
|
||||
if name in params:
|
||||
yield(name, params[name])
|
||||
|
||||
def split_csv(self, **kw):
|
||||
"""
|
||||
Return a dictionary of values where values are decoded from CSV.
|
||||
|
||||
For example:
|
||||
|
||||
>>> class my_command(Command):
|
||||
... takes_options = (
|
||||
... Param('flags', multivalue=True, csv=True),
|
||||
... )
|
||||
...
|
||||
>>> c = my_command()
|
||||
>>> c.finalize()
|
||||
>>> c.split_csv(flags=u'public,replicated')
|
||||
{'flags': (u'public', u'replicated')}
|
||||
"""
|
||||
return dict(
|
||||
(k, self.params[k].split_csv(v)) for (k, v) in kw.iteritems()
|
||||
)
|
||||
|
||||
def normalize(self, **kw):
|
||||
"""
|
||||
Return a dictionary of normalized values.
|
||||
|
@ -345,11 +345,16 @@ class Param(ReadOnly):
|
||||
is not `required`. Applied for all crud.Update based commands
|
||||
* req_update: The parameter is `required` in all crud.Update based
|
||||
commands
|
||||
- hint: This attribute is currently not used
|
||||
- hint: this attribute is currently not used
|
||||
- alwaysask: when enabled, CLI asks for parameter value even when the
|
||||
parameter is not `required`
|
||||
- sortorder: used to sort a list of parameters for Command. See
|
||||
`Command.finalize()` for further information
|
||||
- csv: this multivalue attribute is given in CSV format
|
||||
- csv_separator: character that separates values in CSV (comma by
|
||||
default)
|
||||
- csv_skipspace: if true, leading whitespace will be ignored in
|
||||
individual CSV values
|
||||
"""
|
||||
|
||||
# This is a dummy type so that most of the functionality of Param can be
|
||||
@ -697,13 +702,47 @@ class Param(ReadOnly):
|
||||
# csv.py doesn't do Unicode; encode temporarily as UTF-8:
|
||||
csv_reader = csv.reader(self.__utf_8_encoder(unicode_csv_data),
|
||||
dialect=dialect,
|
||||
delimiter=self.csv_separator, escapechar='\\',
|
||||
delimiter=self.csv_separator, quotechar='"',
|
||||
skipinitialspace=self.csv_skipspace,
|
||||
**kwargs)
|
||||
for row in csv_reader:
|
||||
# decode UTF-8 back to Unicode, cell by cell:
|
||||
yield [unicode(cell, 'utf-8') for cell in row]
|
||||
|
||||
def split_csv(self, value):
|
||||
"""Split CSV strings into individual values.
|
||||
|
||||
For CSV params, ``value`` is a tuple of strings. Each of these is split
|
||||
on commas, and the results are concatenated into one tuple.
|
||||
|
||||
For example::
|
||||
|
||||
>>> param = Param('telephones', multivalue=True, csv=True)
|
||||
>>> param.split_csv((u'1, 2', u'3', u'4, 5, 6'))
|
||||
(u'1', u'2', u'3', u'4', u'5', u'6')
|
||||
|
||||
If ``value`` is not a tuple (or list), it is only split::
|
||||
|
||||
>>> param = Param('telephones', multivalue=True, csv=True)
|
||||
>>> param.split_csv(u'1, 2, 3')
|
||||
(u'1', u'2', u'3')
|
||||
|
||||
For non-CSV params, return the value unchanged.
|
||||
"""
|
||||
if self.csv:
|
||||
if type(value) not in (tuple, list):
|
||||
value = (value,)
|
||||
newval = ()
|
||||
for v in value:
|
||||
if isinstance(v, basestring):
|
||||
csvreader = self.__unicode_csv_reader([unicode(v)])
|
||||
newval += tuple(csvreader.next()) #pylint: disable=E1101
|
||||
else:
|
||||
newval += (v,)
|
||||
return newval
|
||||
else:
|
||||
return value
|
||||
|
||||
def normalize(self, value):
|
||||
"""
|
||||
Normalize ``value`` using normalizer callback.
|
||||
@ -730,15 +769,6 @@ class Param(ReadOnly):
|
||||
if self.multivalue:
|
||||
if type(value) not in (tuple, list):
|
||||
value = (value,)
|
||||
if self.csv:
|
||||
newval = ()
|
||||
for v in value:
|
||||
if isinstance(v, basestring):
|
||||
csvreader = self.__unicode_csv_reader([unicode(v)])
|
||||
newval += tuple(csvreader.next()) #pylint: disable=E1101
|
||||
else:
|
||||
newval += (v,)
|
||||
value = newval
|
||||
if self.multivalue:
|
||||
return tuple(
|
||||
self._normalize_scalar(v) for v in value
|
||||
|
@ -635,44 +635,44 @@ class test_Param(ClassChecker):
|
||||
assert o._convert_scalar.value is default
|
||||
assert o.normalizer.value is default
|
||||
|
||||
def test_csv_normalize(self):
|
||||
def test_split_csv(self):
|
||||
"""
|
||||
Test the `ipalib.parameters.Param.normalize` method with csv.
|
||||
Test the `ipalib.parameters.Param.split_csv` method with csv.
|
||||
"""
|
||||
o = self.cls('my_list+', csv=True)
|
||||
n = o.normalize('a,b')
|
||||
n = o.split_csv('a,b')
|
||||
assert type(n) is tuple
|
||||
assert len(n) is 2
|
||||
|
||||
n = o.normalize('bar, "hi, there",foo')
|
||||
n = o.split_csv('bar, "hi, there",foo')
|
||||
assert type(n) is tuple
|
||||
assert len(n) is 3
|
||||
|
||||
def test_csv_normalize_separator(self):
|
||||
def test_split_csv_separator(self):
|
||||
"""
|
||||
Test the `ipalib.parameters.Param.normalize` method with csv and a separator.
|
||||
Test the `ipalib.parameters.Param.split_csv` method with csv and a separator.
|
||||
"""
|
||||
o = self.cls('my_list+', csv=True, csv_separator='|')
|
||||
|
||||
n = o.normalize('a')
|
||||
n = o.split_csv('a')
|
||||
assert type(n) is tuple
|
||||
assert len(n) is 1
|
||||
|
||||
n = o.normalize('a|b')
|
||||
n = o.split_csv('a|b')
|
||||
assert type(n) is tuple
|
||||
assert len(n) is 2
|
||||
|
||||
def test_csv_normalize_skipspace(self):
|
||||
def test_split_csv_skipspace(self):
|
||||
"""
|
||||
Test the `ipalib.parameters.Param.normalize` method with csv without skipping spaces.
|
||||
Test the `ipalib.parameters.Param.split_csv` method with csv without skipping spaces.
|
||||
"""
|
||||
o = self.cls('my_list+', csv=True, csv_skipspace=False)
|
||||
|
||||
n = o.normalize('a')
|
||||
n = o.split_csv('a')
|
||||
assert type(n) is tuple
|
||||
assert len(n) is 1
|
||||
|
||||
n = o.normalize('a, "b,c", d')
|
||||
n = o.split_csv('a, "b,c", d')
|
||||
assert type(n) is tuple
|
||||
# the output w/o skipspace is ['a',' "b','c"',' d']
|
||||
assert len(n) is 4
|
||||
|
@ -87,7 +87,7 @@ class test_delegation(Declarative):
|
||||
desc='Create %r' % delegation1,
|
||||
command=(
|
||||
'delegation_add', [delegation1], dict(
|
||||
attrs=u'street,c,l,st,postalCode',
|
||||
attrs=[u'street', u'c', u'l', u'st', u'postalCode'],
|
||||
permissions=u'write',
|
||||
group=u'editors',
|
||||
memberof=u'admins',
|
||||
@ -111,7 +111,7 @@ class test_delegation(Declarative):
|
||||
desc='Try to create duplicate %r' % delegation1,
|
||||
command=(
|
||||
'delegation_add', [delegation1], dict(
|
||||
attrs=u'street,c,l,st,postalCode',
|
||||
attrs=[u'street', u'c', u'l', u'st', u'postalCode'],
|
||||
permissions=u'write',
|
||||
group=u'editors',
|
||||
memberof=u'admins',
|
||||
|
@ -724,8 +724,9 @@ class test_dns(Declarative):
|
||||
|
||||
dict(
|
||||
desc='Add NSEC record to %r using dnsrecord_add' % (dnsres1),
|
||||
command=('dnsrecord_add', [dnszone1, dnsres1], {'nsec_part_next': dnszone1,
|
||||
'nsec_part_types' : ['TXT', 'A']}),
|
||||
command=('dnsrecord_add', [dnszone1, dnsres1], {
|
||||
'nsec_part_next': dnszone1,
|
||||
'nsec_part_types' : [u'TXT', u'A']}),
|
||||
expected={
|
||||
'value': dnsres1,
|
||||
'summary': None,
|
||||
|
@ -87,7 +87,7 @@ class test_privilege(Declarative):
|
||||
command=(
|
||||
'permission_add', [permission1], dict(
|
||||
type=u'user',
|
||||
permissions=u'add, delete',
|
||||
permissions=[u'add', u'delete'],
|
||||
)
|
||||
),
|
||||
expected=dict(
|
||||
|
@ -47,9 +47,6 @@ privilege1 = u'r,w privilege 1'
|
||||
privilege1_dn = DN(('cn', privilege1), DN(api.env.container_privilege),
|
||||
api.env.basedn)
|
||||
|
||||
def escape_comma(value):
|
||||
return value.replace(',', '\\,')
|
||||
|
||||
class test_role(Declarative):
|
||||
|
||||
cleanup_commands = [
|
||||
@ -184,7 +181,7 @@ class test_role(Declarative):
|
||||
dict(
|
||||
desc='Add privilege %r to role %r' % (privilege1, role1),
|
||||
command=('role_add_privilege', [role1],
|
||||
dict(privilege=escape_comma(privilege1))
|
||||
dict(privilege=privilege1)
|
||||
),
|
||||
expected=dict(
|
||||
completed=1,
|
||||
@ -465,7 +462,7 @@ class test_role(Declarative):
|
||||
dict(
|
||||
desc='Remove privilege %r from role %r' % (privilege1, role1),
|
||||
command=('role_remove_privilege', [role1],
|
||||
dict(privilege=escape_comma(privilege1))
|
||||
dict(privilege=privilege1)
|
||||
),
|
||||
expected=dict(
|
||||
completed=1,
|
||||
@ -486,7 +483,7 @@ class test_role(Declarative):
|
||||
dict(
|
||||
desc='Remove privilege %r from role %r again' % (privilege1, role1),
|
||||
command=('role_remove_privilege', [role1],
|
||||
dict(privilege=escape_comma(privilege1))
|
||||
dict(privilege=privilege1)
|
||||
),
|
||||
expected=dict(
|
||||
completed=0,
|
||||
|
@ -74,8 +74,8 @@ class test_selfservice(Declarative):
|
||||
desc='Create %r' % selfservice1,
|
||||
command=(
|
||||
'selfservice_add', [selfservice1], dict(
|
||||
attrs=u'street,c,l,st,postalCode',
|
||||
permissions=u'write',
|
||||
attrs=[u'street', u'c', u'l', u'st', u'postalcode'],
|
||||
permissions=u'write',
|
||||
)
|
||||
),
|
||||
expected=dict(
|
||||
@ -95,8 +95,8 @@ class test_selfservice(Declarative):
|
||||
desc='Try to create duplicate %r' % selfservice1,
|
||||
command=(
|
||||
'selfservice_add', [selfservice1], dict(
|
||||
attrs=u'street,c,l,st,postalCode',
|
||||
permissions=u'write',
|
||||
attrs=[u'street', u'c', u'l', u'st', u'postalcode'],
|
||||
permissions=u'write',
|
||||
),
|
||||
),
|
||||
expected=errors.DuplicateEntry(),
|
||||
|
Loading…
Reference in New Issue
Block a user