mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Fix more bytes/unicode issues
Reviewed-By: Tomas Babej <tbabej@redhat.com>
This commit is contained in:
committed by
Martin Basti
parent
0a23afeab2
commit
8a2b65a357
@@ -244,7 +244,7 @@ class Env(object):
|
||||
assert not hasattr(self, key)
|
||||
if isinstance(value, six.string_types):
|
||||
value = value.strip()
|
||||
if isinstance(value, str):
|
||||
if isinstance(value, bytes):
|
||||
value = value.decode('utf-8')
|
||||
m = {
|
||||
'True': True,
|
||||
|
@@ -224,7 +224,7 @@ def xml_unwrap(value, encoding='UTF-8'):
|
||||
return dict(
|
||||
(k, xml_unwrap(v, encoding)) for (k, v) in value.items()
|
||||
)
|
||||
if type(value) is str:
|
||||
if isinstance(value, bytes):
|
||||
return value.decode(encoding)
|
||||
if isinstance(value, Binary):
|
||||
assert type(value.data) is bytes
|
||||
@@ -363,7 +363,7 @@ def json_decode_binary(val):
|
||||
|
||||
def decode_fault(e, encoding='UTF-8'):
|
||||
assert isinstance(e, Fault)
|
||||
if type(e.faultString) is str:
|
||||
if isinstance(e.faultString, bytes):
|
||||
return Fault(e.faultCode, e.faultString.decode(encoding))
|
||||
return e
|
||||
|
||||
|
@@ -39,7 +39,7 @@ class DNSName(dns.name.Name):
|
||||
try:
|
||||
if isinstance(labels, six.string_types):
|
||||
#pylint: disable=E1101
|
||||
labels = dns.name.from_unicode(unicode(labels), origin).labels
|
||||
labels = dns.name.from_text(unicode(labels), origin).labels
|
||||
elif isinstance(labels, dns.name.Name):
|
||||
labels = labels.labels
|
||||
|
||||
|
@@ -1314,8 +1314,9 @@ class LDAPClient(object):
|
||||
|
||||
# pass arguments to python-ldap
|
||||
with self.error_handler():
|
||||
filter = self.encode(filter)
|
||||
attrs_list = self.encode(attrs_list)
|
||||
if six.PY2:
|
||||
filter = self.encode(filter)
|
||||
attrs_list = self.encode(attrs_list)
|
||||
|
||||
while True:
|
||||
if paged_search:
|
||||
|
@@ -1273,26 +1273,26 @@ class ReplicationManager(object):
|
||||
try:
|
||||
entry.raw['aci'].remove(
|
||||
b'(target = "ldap:///cn=*,cn=ca_renewal,cn=ipa,cn=etc,'
|
||||
'%(suffix)s")(version 3.0; acl "Add CA Certificates for '
|
||||
'renewals"; allow(add) userdn = "ldap:///fqdn=%(fqdn)s,'
|
||||
'cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
b'%(suffix)s")(version 3.0; acl "Add CA Certificates for '
|
||||
b'renewals"; allow(add) userdn = "ldap:///fqdn=%(fqdn)s,'
|
||||
b'cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
entry.raw['aci'].remove(
|
||||
b'(target = "ldap:///cn=*,cn=ca_renewal,cn=ipa,cn=etc,'
|
||||
'%(suffix)s")(targetattr = "userCertificate")'
|
||||
'(version 3.0; acl "Modify CA Certificates for renewals"; '
|
||||
'allow(write) userdn = "ldap:///fqdn=%(fqdn)s,'
|
||||
'cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
b'%(suffix)s")(targetattr = "userCertificate")'
|
||||
b'(version 3.0; acl "Modify CA Certificates for renewals"; '
|
||||
b'allow(write) userdn = "ldap:///fqdn=%(fqdn)s,'
|
||||
b'cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
entry.raw['aci'].remove(
|
||||
b'(target = "ldap:///cn=CAcert,cn=ipa,cn=etc,%(suffix)s")'
|
||||
'(targetattr = cACertificate)(version 3.0; acl "Modify CA '
|
||||
'Certificate"; allow (write) userdn = "ldap:///fqdn='
|
||||
'%(fqdn)s,cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
b'(targetattr = cACertificate)(version 3.0; acl "Modify CA '
|
||||
b'Certificate"; allow (write) userdn = "ldap:///fqdn='
|
||||
b'%(fqdn)s,cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
@@ -1318,19 +1318,19 @@ class ReplicationManager(object):
|
||||
try:
|
||||
entry.raw['aci'].remove(
|
||||
b'(targetfilter = "(objectClass=nsContainer)")'
|
||||
'(targetattr = "cn || objectClass || ipaConfigString")'
|
||||
'(version 3.0; acl "Read IPA Masters"; allow (read, '
|
||||
'search, compare) userdn = "ldap:///fqdn=%(fqdn)s,'
|
||||
'cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
b'(targetattr = "cn || objectClass || ipaConfigString")'
|
||||
b'(version 3.0; acl "Read IPA Masters"; allow (read, '
|
||||
b'search, compare) userdn = "ldap:///fqdn=%(fqdn)s,'
|
||||
b'cn=computers,cn=accounts,%(suffix)s";)' % sub)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
entry.raw['aci'].remove(
|
||||
b'(targetfilter = "(objectClass=nsContainer)")'
|
||||
'(targetattr = "ipaConfigString")(version 3.0; acl '
|
||||
'"Modify IPA Masters"; allow (write) userdn = '
|
||||
'"ldap:///fqdn=%(fqdn)s,cn=computers,cn=accounts,'
|
||||
'%(suffix)s";)' % sub)
|
||||
b'(targetattr = "ipaConfigString")(version 3.0; acl '
|
||||
b'"Modify IPA Masters"; allow (write) userdn = '
|
||||
b'"ldap:///fqdn=%(fqdn)s,cn=computers,cn=accounts,'
|
||||
b'%(suffix)s";)' % sub)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
@@ -1356,11 +1356,11 @@ class ReplicationManager(object):
|
||||
try:
|
||||
entry.raw['aci'].remove(
|
||||
b'(targetfilter = "(&(objectClass=ipaCertificate)'
|
||||
'(ipaConfigString=ipaCA))")(targetattr = '
|
||||
'"ipaCertIssuerSerial || cACertificate")(version 3.0; acl '
|
||||
'"Modify CA Certificate Store Entry"; allow (write) '
|
||||
'userdn = "ldap:///fqdn=%(fqdn)s,cn=computers,cn=accounts,'
|
||||
'%(suffix)s";)' % sub)
|
||||
b'(ipaConfigString=ipaCA))")(targetattr = '
|
||||
b'"ipaCertIssuerSerial || cACertificate")(version 3.0; acl '
|
||||
b'"Modify CA Certificate Store Entry"; allow (write) '
|
||||
b'userdn = "ldap:///fqdn=%(fqdn)s,cn=computers,cn=accounts,'
|
||||
b'%(suffix)s";)' % sub)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
@@ -27,7 +27,7 @@ import struct
|
||||
# A string that should have bytes 'x\00' through '\xff':
|
||||
binary_bytes = b''.join(struct.pack('B', d) for d in range(256))
|
||||
assert b'\x00' in binary_bytes and b'\xff' in binary_bytes
|
||||
assert type(binary_bytes) is str and len(binary_bytes) == 256
|
||||
assert type(binary_bytes) is bytes and len(binary_bytes) == 256
|
||||
|
||||
# A UTF-8 encoded str:
|
||||
utf8_bytes = b'\xd0\x9f\xd0\xb0\xd0\xb2\xd0\xb5\xd0\xbb'
|
||||
|
@@ -305,8 +305,8 @@ class TestCLIParsing(object):
|
||||
|
||||
# Create a mock service object to test against
|
||||
adtrust_add = dict(
|
||||
ipaconfigstring='enabledService',
|
||||
objectclass=['top', 'nsContainer', 'ipaConfigObject']
|
||||
ipaconfigstring=b'enabledService',
|
||||
objectclass=[b'top', b'nsContainer', b'ipaConfigObject']
|
||||
)
|
||||
|
||||
mockldap = util.MockLDAP()
|
||||
|
@@ -183,8 +183,14 @@ def test_check_name():
|
||||
]
|
||||
for name in okay:
|
||||
assert name is f(name)
|
||||
e = raises(TypeError, f, unicode(name))
|
||||
assert str(e) == TYPE_ERROR % ('name', str, unicode(name), unicode)
|
||||
if six.PY2:
|
||||
bad_type = unicode
|
||||
bad_value = unicode(name)
|
||||
else:
|
||||
bad_type = bytes
|
||||
bad_value = name.encode('ascii')
|
||||
e = raises(TypeError, f, bad_value)
|
||||
assert str(e) == TYPE_ERROR % ('name', str, bad_value, bad_type)
|
||||
for name in nope:
|
||||
e = raises(ValueError, f, name)
|
||||
assert str(e) == NAME_ERROR % (NAME_REGEX, name)
|
||||
|
@@ -213,8 +213,8 @@ class PublicExceptionTester(object):
|
||||
|
||||
def new(self, format=None, message=None, **kw):
|
||||
# Test that TypeError is raised if message isn't unicode:
|
||||
e = raises(TypeError, self.klass, message='The message')
|
||||
assert str(e) == TYPE_ERROR % ('message', unicode, 'The message', str)
|
||||
e = raises(TypeError, self.klass, message=b'The message')
|
||||
assert str(e) == TYPE_ERROR % ('message', unicode, b'The message', bytes)
|
||||
|
||||
# Test the instance:
|
||||
for (key, value) in kw.items():
|
||||
@@ -261,9 +261,9 @@ class test_PublicError(PublicExceptionTester):
|
||||
assert inst.key1 is val1
|
||||
assert inst.key2 is val2
|
||||
|
||||
# Test with format=None, message=str
|
||||
e = raises(TypeError, self.klass, message='the message', **kw)
|
||||
assert str(e) == TYPE_ERROR % ('message', unicode, 'the message', str)
|
||||
# Test with format=None, message=bytes
|
||||
e = raises(TypeError, self.klass, message=b'the message', **kw)
|
||||
assert str(e) == TYPE_ERROR % ('message', unicode, b'the message', bytes)
|
||||
|
||||
# Test with format=None, message=None
|
||||
e = raises(ValueError, self.klass, **kw)
|
||||
|
@@ -294,9 +294,14 @@ class test_Command(ClassChecker):
|
||||
assert ns.source.multivalue is False
|
||||
|
||||
# Test TypeError:
|
||||
e = raises(TypeError, self.get_instance, args=(u'whatever',))
|
||||
assert str(e) == TYPE_ERROR % (
|
||||
'spec', (str, parameters.Param), u'whatever', unicode)
|
||||
if six.PY2:
|
||||
e = raises(TypeError, self.get_instance, args=(u'whatever',))
|
||||
assert str(e) == TYPE_ERROR % (
|
||||
'spec', (str, parameters.Param), u'whatever', unicode)
|
||||
else:
|
||||
e = raises(TypeError, self.get_instance, args=(b'whatever',))
|
||||
assert str(e) == TYPE_ERROR % (
|
||||
'spec', (str, parameters.Param), b'whatever', bytes)
|
||||
|
||||
# Test ValueError, required after optional:
|
||||
e = raises(ValueError, self.get_instance, args=('arg1?', 'arg2'))
|
||||
|
@@ -155,8 +155,12 @@ def test_parse_param_spec():
|
||||
assert f('name^') == ('name^', dict(required=True, multivalue=False))
|
||||
|
||||
# Test that TypeError is raised if spec isn't an str:
|
||||
e = raises(TypeError, f, u'name?')
|
||||
assert str(e) == TYPE_ERROR % ('spec', str, u'name?', unicode)
|
||||
if six.PY2:
|
||||
bad_value = u'name?'
|
||||
else:
|
||||
bad_value = b'name?'
|
||||
e = raises(TypeError, f, bad_value)
|
||||
assert str(e) == TYPE_ERROR % ('spec', str, bad_value, type(bad_value))
|
||||
|
||||
|
||||
class DummyRule(object):
|
||||
@@ -737,7 +741,7 @@ class test_Bytes(ClassChecker):
|
||||
Test the `ipalib.parameters.Bytes.__init__` method.
|
||||
"""
|
||||
o = self.cls('my_bytes')
|
||||
assert o.type is str
|
||||
assert o.type is bytes
|
||||
assert o.password is False
|
||||
assert o.rules == tuple()
|
||||
assert o.class_rules == tuple()
|
||||
@@ -798,12 +802,12 @@ class test_Bytes(ClassChecker):
|
||||
assert dummy.translation is translation
|
||||
|
||||
# Test with passing values:
|
||||
for value in ('abc', 'four', '12345'):
|
||||
for value in (b'abc', b'four', b'12345'):
|
||||
assert rule(dummy, value) is None
|
||||
assert dummy.called() is False
|
||||
|
||||
# Test with failing values:
|
||||
for value in ('', 'a', '12'):
|
||||
for value in (b'', b'a', b'12'):
|
||||
assert_equal(
|
||||
rule(dummy, value),
|
||||
translation % dict(minlength=3)
|
||||
@@ -824,12 +828,12 @@ class test_Bytes(ClassChecker):
|
||||
assert dummy.translation is translation
|
||||
|
||||
# Test with passing values:
|
||||
for value in ('ab', '123', 'four'):
|
||||
for value in (b'ab', b'123', b'four'):
|
||||
assert rule(dummy, value) is None
|
||||
assert dummy.called() is False
|
||||
|
||||
# Test with failing values:
|
||||
for value in ('12345', 'sixsix'):
|
||||
for value in (b'12345', b'sixsix'):
|
||||
assert_equal(
|
||||
rule(dummy, value),
|
||||
translation % dict(maxlength=4)
|
||||
@@ -850,12 +854,12 @@ class test_Bytes(ClassChecker):
|
||||
assert dummy.translation is translation
|
||||
|
||||
# Test with passing values:
|
||||
for value in ('1234', 'four'):
|
||||
for value in (b'1234', b'four'):
|
||||
assert rule(dummy, value) is None
|
||||
assert dummy.called() is False
|
||||
|
||||
# Test with failing values:
|
||||
for value in ('ab', '123', '12345', 'sixsix'):
|
||||
for value in (b'ab', b'123', b'12345', b'sixsix'):
|
||||
assert_equal(
|
||||
rule(dummy, value),
|
||||
translation % dict(length=4),
|
||||
@@ -869,9 +873,9 @@ class test_Bytes(ClassChecker):
|
||||
Test the `ipalib.parameters.Bytes._rule_pattern` method.
|
||||
"""
|
||||
# Test our assumptions about Python re module and Unicode:
|
||||
pat = '\w+$'
|
||||
pat = b'\w+$'
|
||||
r = re.compile(pat)
|
||||
assert r.match('Hello_World') is not None
|
||||
assert r.match(b'Hello_World') is not None
|
||||
assert r.match(utf8_bytes) is None
|
||||
assert r.match(binary_bytes) is None
|
||||
|
||||
@@ -883,12 +887,12 @@ class test_Bytes(ClassChecker):
|
||||
dummy = dummy_ugettext(translation)
|
||||
|
||||
# Test with passing values:
|
||||
for value in ('HELLO', 'hello', 'Hello_World'):
|
||||
for value in (b'HELLO', b'hello', b'Hello_World'):
|
||||
assert rule(dummy, value) is None
|
||||
assert dummy.called() is False
|
||||
|
||||
# Test with failing values:
|
||||
for value in ('Hello!', 'Hello World', utf8_bytes, binary_bytes):
|
||||
for value in (b'Hello!', b'Hello World', utf8_bytes, binary_bytes):
|
||||
assert_equal(
|
||||
rule(dummy, value),
|
||||
translation % dict(pattern=pat),
|
||||
@@ -924,7 +928,7 @@ class test_Str(ClassChecker):
|
||||
mthd = o._convert_scalar
|
||||
for value in (u'Hello', 42, 1.2, unicode_str):
|
||||
assert mthd(value) == unicode(value)
|
||||
bad = [True, 'Hello', dict(one=1), utf8_bytes]
|
||||
bad = [True, b'Hello', dict(one=1), utf8_bytes]
|
||||
for value in bad:
|
||||
e = raises(errors.ConversionError, mthd, value)
|
||||
assert e.name == 'my_str'
|
||||
@@ -1028,7 +1032,10 @@ class test_Str(ClassChecker):
|
||||
pat = '\w{5}$'
|
||||
r1 = re.compile(pat)
|
||||
r2 = re.compile(pat, re.UNICODE)
|
||||
assert r1.match(unicode_str) is None
|
||||
if six.PY2:
|
||||
assert r1.match(unicode_str) is None
|
||||
else:
|
||||
assert r1.match(unicode_str) is not None
|
||||
assert r2.match(unicode_str) is not None
|
||||
|
||||
# Create instance:
|
||||
@@ -1164,8 +1171,8 @@ class test_StrEnum(EnumChecker):
|
||||
_name = 'my_strenum'
|
||||
_datatype = unicode
|
||||
_test_values = u'Hello', u'naughty', u'nurse!'
|
||||
_bad_type_values = u'Hello', 'naughty', u'nurse!'
|
||||
_bad_type = str
|
||||
_bad_type_values = u'Hello', b'naughty', u'nurse!'
|
||||
_bad_type = bytes
|
||||
_translation = u"values='Hello', 'naughty', 'nurse!'"
|
||||
_bad_values = u'Howdy', u'quiet', u'library!'
|
||||
_single_value_translation = u"value='Hello'"
|
||||
@@ -1551,7 +1558,11 @@ def test_create_param():
|
||||
assert p.multivalue is kw['multivalue']
|
||||
|
||||
# Test that TypeError is raised when spec is neither a Param nor a str:
|
||||
for spec in (u'one', 42, parameters.Param, parameters.Str):
|
||||
if six.PY2:
|
||||
bad_value = u'one'
|
||||
else:
|
||||
bad_value = b'one'
|
||||
for spec in (bad_value, 42, parameters.Param, parameters.Str):
|
||||
e = raises(TypeError, f, spec)
|
||||
assert str(e) == \
|
||||
TYPE_ERROR % ('spec', (str, parameters.Param), spec, type(spec))
|
||||
@@ -1590,7 +1601,10 @@ class test_IA5Str(ClassChecker):
|
||||
e = raises(errors.ConversionError, mthd, value)
|
||||
assert e.name == 'my_str'
|
||||
assert e.index is None
|
||||
assert_equal(e.error, "The character '\\xc3' is not allowed.")
|
||||
if six.PY2:
|
||||
assert_equal(e.error, "The character '\\xc3' is not allowed.")
|
||||
else:
|
||||
assert_equal(e.error, "The character 'á' is not allowed.")
|
||||
|
||||
|
||||
class test_DateTime(ClassChecker):
|
||||
|
@@ -63,19 +63,25 @@ def test_round_trip():
|
||||
"""
|
||||
# We first test that our assumptions about xmlrpc.client module in the Python
|
||||
# standard library are correct:
|
||||
assert_equal(dump_n_load(utf8_bytes), unicode_str)
|
||||
if six.PY2:
|
||||
output_binary_type = bytes
|
||||
else:
|
||||
output_binary_type = Binary
|
||||
|
||||
if six.PY2:
|
||||
assert_equal(dump_n_load(utf8_bytes), unicode_str)
|
||||
assert_equal(dump_n_load(unicode_str), unicode_str)
|
||||
# "Binary" is not "str". pylint: disable=no-member
|
||||
assert_equal(dump_n_load(Binary(binary_bytes)).data, binary_bytes)
|
||||
assert isinstance(dump_n_load(Binary(binary_bytes)), Binary)
|
||||
assert type(dump_n_load(b'hello')) is bytes
|
||||
assert type(dump_n_load(u'hello')) is bytes
|
||||
assert_equal(dump_n_load(b''), b'')
|
||||
assert_equal(dump_n_load(u''), b'')
|
||||
assert type(dump_n_load(b'hello')) is output_binary_type
|
||||
assert type(dump_n_load(u'hello')) is str
|
||||
assert_equal(dump_n_load(b''), output_binary_type(b''))
|
||||
assert_equal(dump_n_load(u''), str())
|
||||
assert dump_n_load(None) is None
|
||||
|
||||
# Now we test our wrap and unwrap methods in combination with dumps, loads:
|
||||
# All str should come back str (because they get wrapped in
|
||||
# All bytes should come back bytes (because they get wrapped in
|
||||
# xmlrpc.client.Binary(). All unicode should come back unicode because str
|
||||
# explicity get decoded by rpc.xml_unwrap() if they weren't already
|
||||
# decoded by xmlrpc.client.loads().
|
||||
@@ -136,7 +142,7 @@ def test_xml_dumps():
|
||||
params = (binary_bytes, utf8_bytes, unicode_str, None)
|
||||
|
||||
# Test serializing an RPC request:
|
||||
data = f(params, API_VERSION, b'the_method')
|
||||
data = f(params, API_VERSION, 'the_method')
|
||||
(p, m) = loads(data)
|
||||
assert_equal(m, u'the_method')
|
||||
assert type(p) is tuple
|
||||
@@ -167,7 +173,7 @@ def test_xml_loads():
|
||||
wrapped = rpc.xml_wrap(params, API_VERSION)
|
||||
|
||||
# Test un-serializing an RPC request:
|
||||
data = dumps(wrapped, b'the_method', allow_none=True)
|
||||
data = dumps(wrapped, 'the_method', allow_none=True)
|
||||
(p, m) = f(data)
|
||||
assert_equal(m, u'the_method')
|
||||
assert_equal(p, params)
|
||||
|
Reference in New Issue
Block a user