mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-22 15:13:50 -06:00
ipalib: move json formatter to a separate file
To prevent cyclic imports, move JSON handling code to a separate file. Signed-off-by: Alexander Bokovoy <abokovoy@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
parent
9e861693fc
commit
fd0f432fec
192
ipalib/ipajson.py
Normal file
192
ipalib/ipajson.py
Normal file
@ -0,0 +1,192 @@
|
||||
#
|
||||
# Copyright (C) 2024 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
|
||||
import base64
|
||||
from cryptography import x509 as crypto_x509
|
||||
import datetime
|
||||
from decimal import Decimal
|
||||
import json
|
||||
import six
|
||||
from ipalib.constants import LDAP_GENERALIZED_TIME_FORMAT
|
||||
from ipalib import capabilities
|
||||
from ipalib.x509 import Encoding as x509_Encoding
|
||||
from ipapython.dn import DN
|
||||
from ipapython.dnsutil import DNSName
|
||||
from ipapython.kerberos import Principal
|
||||
|
||||
if six.PY3:
|
||||
unicode = str
|
||||
|
||||
|
||||
class _JSONPrimer(dict):
|
||||
"""Fast JSON primer and pre-converter
|
||||
|
||||
Prepare a data structure for JSON serialization. In an ideal world, priming
|
||||
could be handled by the default hook of json.dumps(). Unfortunately the
|
||||
hook treats Python 2 str as text while IPA considers str as bytes.
|
||||
|
||||
The primer uses a couple of tricks to archive maximum performance:
|
||||
|
||||
* O(1) type look instead of O(n) chain of costly isinstance() calls
|
||||
* __missing__ and __mro__ with caching to handle subclasses
|
||||
* inline code with minor code duplication (func lookup in enc_list/dict)
|
||||
* avoid surplus function calls (e.g. func is _identity, obj.__class__
|
||||
instead if type(obj))
|
||||
* function default arguments to turn global into local lookups
|
||||
* avoid re-creation of bound method objects (e.g. result.append)
|
||||
* on-demand lookup of client capabilities with cached values
|
||||
|
||||
Depending on the client version number, the primer converts:
|
||||
|
||||
* bytes -> {'__base64__': b64encode}
|
||||
* datetime -> {'__datetime__': LDAP_GENERALIZED_TIME}
|
||||
* DNSName -> {'__dns_name__': unicode}
|
||||
|
||||
The _ipa_obj_hook() functions unserializes the marked JSON objects to
|
||||
bytes, datetime and DNSName.
|
||||
|
||||
:see: _ipa_obj_hook
|
||||
"""
|
||||
__slots__ = ('version', '_cap_datetime', '_cap_dnsname')
|
||||
|
||||
_identity = object()
|
||||
|
||||
def __init__(self, version, _identity=_identity):
|
||||
super(_JSONPrimer, self).__init__()
|
||||
self.version = version
|
||||
self._cap_datetime = None
|
||||
self._cap_dnsname = None
|
||||
self.update({
|
||||
unicode: _identity,
|
||||
bool: _identity,
|
||||
int: _identity,
|
||||
type(None): _identity,
|
||||
float: _identity,
|
||||
Decimal: unicode,
|
||||
DN: str,
|
||||
Principal: unicode,
|
||||
DNSName: self._enc_dnsname,
|
||||
datetime.datetime: self._enc_datetime,
|
||||
bytes: self._enc_bytes,
|
||||
list: self._enc_list,
|
||||
tuple: self._enc_list,
|
||||
dict: self._enc_dict,
|
||||
crypto_x509.Certificate: self._enc_certificate,
|
||||
crypto_x509.CertificateSigningRequest: self._enc_certificate,
|
||||
})
|
||||
|
||||
def __missing__(self, typ):
|
||||
# walk MRO to find best match
|
||||
for c in typ.__mro__:
|
||||
if c in self:
|
||||
self[typ] = self[c]
|
||||
return self[c]
|
||||
# use issubclass to check for registered ABCs
|
||||
for c in self:
|
||||
if issubclass(typ, c):
|
||||
self[typ] = self[c]
|
||||
return self[c]
|
||||
raise TypeError(typ)
|
||||
|
||||
def convert(self, obj, _identity=_identity):
|
||||
# obj.__class__ is twice as fast as type(obj)
|
||||
func = self[obj.__class__]
|
||||
return obj if func is _identity else func(obj)
|
||||
|
||||
def _enc_datetime(self, val):
|
||||
cap = self._cap_datetime
|
||||
if cap is None:
|
||||
cap = capabilities.client_has_capability(self.version,
|
||||
'datetime_values')
|
||||
self._cap_datetime = cap
|
||||
if cap:
|
||||
return {'__datetime__': val.strftime(LDAP_GENERALIZED_TIME_FORMAT)}
|
||||
else:
|
||||
return val.strftime(LDAP_GENERALIZED_TIME_FORMAT)
|
||||
|
||||
def _enc_dnsname(self, val):
|
||||
cap = self._cap_dnsname
|
||||
if cap is None:
|
||||
cap = capabilities.client_has_capability(self.version,
|
||||
'dns_name_values')
|
||||
self._cap_dnsname = cap
|
||||
if cap:
|
||||
return {'__dns_name__': unicode(val)}
|
||||
else:
|
||||
return unicode(val)
|
||||
|
||||
def _enc_bytes(self, val):
|
||||
encoded = base64.b64encode(val)
|
||||
if not six.PY2:
|
||||
encoded = encoded.decode('ascii')
|
||||
return {'__base64__': encoded}
|
||||
|
||||
def _enc_list(self, val, _identity=_identity):
|
||||
result = []
|
||||
append = result.append
|
||||
for v in val:
|
||||
func = self[v.__class__]
|
||||
append(v if func is _identity else func(v))
|
||||
return result
|
||||
|
||||
def _enc_dict(self, val, _identity=_identity, _iteritems=six.iteritems):
|
||||
result = {}
|
||||
for k, v in _iteritems(val):
|
||||
func = self[v.__class__]
|
||||
result[k] = v if func is _identity else func(v)
|
||||
return result
|
||||
|
||||
def _enc_certificate(self, val):
|
||||
return self._enc_bytes(val.public_bytes(x509_Encoding.DER))
|
||||
|
||||
|
||||
def json_encode_binary(val, version, pretty_print=False):
|
||||
"""Serialize a Python object structure to JSON
|
||||
|
||||
:param object val: Python object structure
|
||||
:param str version: client version
|
||||
:param bool pretty_print: indent and sort JSON (warning: slow!)
|
||||
:return: text
|
||||
:note: pretty printing triggers a slow path in Python's JSON module. Only
|
||||
use pretty_print in debug mode.
|
||||
"""
|
||||
result = _JSONPrimer(version).convert(val)
|
||||
if pretty_print:
|
||||
return json.dumps(result, indent=4, sort_keys=True)
|
||||
else:
|
||||
return json.dumps(result)
|
||||
|
||||
|
||||
def _ipa_obj_hook(dct, _iteritems=six.iteritems, _list=list):
|
||||
"""JSON object hook
|
||||
|
||||
:see: _JSONPrimer
|
||||
"""
|
||||
if '__base64__' in dct:
|
||||
return base64.b64decode(dct['__base64__'])
|
||||
elif '__datetime__' in dct:
|
||||
return datetime.datetime.strptime(dct['__datetime__'],
|
||||
LDAP_GENERALIZED_TIME_FORMAT)
|
||||
elif '__dns_name__' in dct:
|
||||
return DNSName(dct['__dns_name__'])
|
||||
else:
|
||||
# XXX tests assume tuples. Is this really necessary?
|
||||
for k, v in _iteritems(dct):
|
||||
if v.__class__ is _list:
|
||||
dct[k] = tuple(v)
|
||||
return dct
|
||||
|
||||
|
||||
def json_decode_binary(val):
|
||||
"""Convert serialized JSON string back to Python data structure
|
||||
|
||||
:param val: JSON string
|
||||
:type val: str, bytes
|
||||
:return: Python data structure
|
||||
:see: _ipa_obj_hook, _JSONPrimer
|
||||
"""
|
||||
if isinstance(val, bytes):
|
||||
val = val.decode('utf-8')
|
||||
|
||||
return json.loads(val, object_hook=_ipa_obj_hook)
|
174
ipalib/rpc.py
174
ipalib/rpc.py
@ -70,6 +70,7 @@ from ipapython.dn import DN
|
||||
from ipapython.kerberos import Principal
|
||||
from ipalib.capabilities import VERSION_WITHOUT_CAPABILITIES
|
||||
from ipalib import api
|
||||
from ipalib.ipajson import json_encode_binary, json_decode_binary
|
||||
|
||||
# The XMLRPC client is in "six.moves.xmlrpc_client", but pylint
|
||||
# cannot handle that
|
||||
@ -276,179 +277,6 @@ def xml_dumps(params, version, methodname=None, methodresponse=False,
|
||||
)
|
||||
|
||||
|
||||
class _JSONPrimer(dict):
|
||||
"""Fast JSON primer and pre-converter
|
||||
|
||||
Prepare a data structure for JSON serialization. In an ideal world, priming
|
||||
could be handled by the default hook of json.dumps(). Unfortunately the
|
||||
hook treats Python 2 str as text while IPA considers str as bytes.
|
||||
|
||||
The primer uses a couple of tricks to archive maximum performance:
|
||||
|
||||
* O(1) type look instead of O(n) chain of costly isinstance() calls
|
||||
* __missing__ and __mro__ with caching to handle subclasses
|
||||
* inline code with minor code duplication (func lookup in enc_list/dict)
|
||||
* avoid surplus function calls (e.g. func is _identity, obj.__class__
|
||||
instead if type(obj))
|
||||
* function default arguments to turn global into local lookups
|
||||
* avoid re-creation of bound method objects (e.g. result.append)
|
||||
* on-demand lookup of client capabilities with cached values
|
||||
|
||||
Depending on the client version number, the primer converts:
|
||||
|
||||
* bytes -> {'__base64__': b64encode}
|
||||
* datetime -> {'__datetime__': LDAP_GENERALIZED_TIME}
|
||||
* DNSName -> {'__dns_name__': unicode}
|
||||
|
||||
The _ipa_obj_hook() functions unserializes the marked JSON objects to
|
||||
bytes, datetime and DNSName.
|
||||
|
||||
:see: _ipa_obj_hook
|
||||
"""
|
||||
__slots__ = ('version', '_cap_datetime', '_cap_dnsname')
|
||||
|
||||
_identity = object()
|
||||
|
||||
def __init__(self, version, _identity=_identity):
|
||||
super(_JSONPrimer, self).__init__()
|
||||
self.version = version
|
||||
self._cap_datetime = None
|
||||
self._cap_dnsname = None
|
||||
self.update({
|
||||
unicode: _identity,
|
||||
bool: _identity,
|
||||
int: _identity,
|
||||
type(None): _identity,
|
||||
float: _identity,
|
||||
Decimal: unicode,
|
||||
DN: str,
|
||||
Principal: unicode,
|
||||
DNSName: self._enc_dnsname,
|
||||
datetime.datetime: self._enc_datetime,
|
||||
bytes: self._enc_bytes,
|
||||
list: self._enc_list,
|
||||
tuple: self._enc_list,
|
||||
dict: self._enc_dict,
|
||||
crypto_x509.Certificate: self._enc_certificate,
|
||||
crypto_x509.CertificateSigningRequest: self._enc_certificate,
|
||||
})
|
||||
|
||||
def __missing__(self, typ):
|
||||
# walk MRO to find best match
|
||||
for c in typ.__mro__:
|
||||
if c in self:
|
||||
self[typ] = self[c]
|
||||
return self[c]
|
||||
# use issubclass to check for registered ABCs
|
||||
for c in self:
|
||||
if issubclass(typ, c):
|
||||
self[typ] = self[c]
|
||||
return self[c]
|
||||
raise TypeError(typ)
|
||||
|
||||
def convert(self, obj, _identity=_identity):
|
||||
# obj.__class__ is twice as fast as type(obj)
|
||||
func = self[obj.__class__]
|
||||
return obj if func is _identity else func(obj)
|
||||
|
||||
def _enc_datetime(self, val):
|
||||
cap = self._cap_datetime
|
||||
if cap is None:
|
||||
cap = capabilities.client_has_capability(self.version,
|
||||
'datetime_values')
|
||||
self._cap_datetime = cap
|
||||
if cap:
|
||||
return {'__datetime__': val.strftime(LDAP_GENERALIZED_TIME_FORMAT)}
|
||||
else:
|
||||
return val.strftime(LDAP_GENERALIZED_TIME_FORMAT)
|
||||
|
||||
def _enc_dnsname(self, val):
|
||||
cap = self._cap_dnsname
|
||||
if cap is None:
|
||||
cap = capabilities.client_has_capability(self.version,
|
||||
'dns_name_values')
|
||||
self._cap_dnsname = cap
|
||||
if cap:
|
||||
return {'__dns_name__': unicode(val)}
|
||||
else:
|
||||
return unicode(val)
|
||||
|
||||
def _enc_bytes(self, val):
|
||||
encoded = base64.b64encode(val)
|
||||
if not six.PY2:
|
||||
encoded = encoded.decode('ascii')
|
||||
return {'__base64__': encoded}
|
||||
|
||||
def _enc_list(self, val, _identity=_identity):
|
||||
result = []
|
||||
append = result.append
|
||||
for v in val:
|
||||
func = self[v.__class__]
|
||||
append(v if func is _identity else func(v))
|
||||
return result
|
||||
|
||||
def _enc_dict(self, val, _identity=_identity, _iteritems=six.iteritems):
|
||||
result = {}
|
||||
for k, v in _iteritems(val):
|
||||
func = self[v.__class__]
|
||||
result[k] = v if func is _identity else func(v)
|
||||
return result
|
||||
|
||||
def _enc_certificate(self, val):
|
||||
return self._enc_bytes(val.public_bytes(x509_Encoding.DER))
|
||||
|
||||
|
||||
def json_encode_binary(val, version, pretty_print=False):
|
||||
"""Serialize a Python object structure to JSON
|
||||
|
||||
:param object val: Python object structure
|
||||
:param str version: client version
|
||||
:param bool pretty_print: indent and sort JSON (warning: slow!)
|
||||
:return: text
|
||||
:note: pretty printing triggers a slow path in Python's JSON module. Only
|
||||
use pretty_print in debug mode.
|
||||
"""
|
||||
result = _JSONPrimer(version).convert(val)
|
||||
if pretty_print:
|
||||
return json.dumps(result, indent=4, sort_keys=True)
|
||||
else:
|
||||
return json.dumps(result)
|
||||
|
||||
|
||||
def _ipa_obj_hook(dct, _iteritems=six.iteritems, _list=list):
|
||||
"""JSON object hook
|
||||
|
||||
:see: _JSONPrimer
|
||||
"""
|
||||
if '__base64__' in dct:
|
||||
return base64.b64decode(dct['__base64__'])
|
||||
elif '__datetime__' in dct:
|
||||
return datetime.datetime.strptime(dct['__datetime__'],
|
||||
LDAP_GENERALIZED_TIME_FORMAT)
|
||||
elif '__dns_name__' in dct:
|
||||
return DNSName(dct['__dns_name__'])
|
||||
else:
|
||||
# XXX tests assume tuples. Is this really necessary?
|
||||
for k, v in _iteritems(dct):
|
||||
if v.__class__ is _list:
|
||||
dct[k] = tuple(v)
|
||||
return dct
|
||||
|
||||
|
||||
def json_decode_binary(val):
|
||||
"""Convert serialized JSON string back to Python data structure
|
||||
|
||||
:param val: JSON string
|
||||
:type val: str, bytes
|
||||
:return: Python data structure
|
||||
:see: _ipa_obj_hook, _JSONPrimer
|
||||
"""
|
||||
if isinstance(val, bytes):
|
||||
val = val.decode('utf-8')
|
||||
|
||||
return json.loads(val, object_hook=_ipa_obj_hook)
|
||||
|
||||
|
||||
def decode_fault(e, encoding='UTF-8'):
|
||||
assert isinstance(e, Fault)
|
||||
if isinstance(e.faultString, bytes):
|
||||
|
@ -54,8 +54,8 @@ from ipalib.errors import (
|
||||
ExecutionError, PasswordExpired, KrbPrincipalExpired, KrbPrincipalWrongFAST,
|
||||
UserLocked)
|
||||
from ipalib.request import context, destroy_context
|
||||
from ipalib.rpc import (xml_dumps, xml_loads,
|
||||
json_encode_binary, json_decode_binary)
|
||||
from ipalib.rpc import xml_dumps, xml_loads
|
||||
from ipalib.ipajson import json_encode_binary, json_decode_binary
|
||||
from ipapython.dn import DN
|
||||
from ipaserver.plugins.ldap2 import ldap2
|
||||
from ipalib.backend import Backend
|
||||
|
Loading…
Reference in New Issue
Block a user