freeipa/ipalib/ipajson.py
Alexander Bokovoy fd0f432fec ipalib: move json formatter to a separate file
To prevent cyclic imports, move JSON handling code to a separate file.

Signed-off-by: Alexander Bokovoy <abokovoy@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
2024-05-22 17:06:23 -04:00

193 lines
6.3 KiB
Python

#
# Copyright (C) 2024 FreeIPA Contributors see COPYING for license
#
import base64
from cryptography import x509 as crypto_x509
import datetime
from decimal import Decimal
import json
import six
from ipalib.constants import LDAP_GENERALIZED_TIME_FORMAT
from ipalib import capabilities
from ipalib.x509 import Encoding as x509_Encoding
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
from ipapython.kerberos import Principal
if six.PY3:
unicode = str
class _JSONPrimer(dict):
"""Fast JSON primer and pre-converter
Prepare a data structure for JSON serialization. In an ideal world, priming
could be handled by the default hook of json.dumps(). Unfortunately the
hook treats Python 2 str as text while IPA considers str as bytes.
The primer uses a couple of tricks to archive maximum performance:
* O(1) type look instead of O(n) chain of costly isinstance() calls
* __missing__ and __mro__ with caching to handle subclasses
* inline code with minor code duplication (func lookup in enc_list/dict)
* avoid surplus function calls (e.g. func is _identity, obj.__class__
instead if type(obj))
* function default arguments to turn global into local lookups
* avoid re-creation of bound method objects (e.g. result.append)
* on-demand lookup of client capabilities with cached values
Depending on the client version number, the primer converts:
* bytes -> {'__base64__': b64encode}
* datetime -> {'__datetime__': LDAP_GENERALIZED_TIME}
* DNSName -> {'__dns_name__': unicode}
The _ipa_obj_hook() functions unserializes the marked JSON objects to
bytes, datetime and DNSName.
:see: _ipa_obj_hook
"""
__slots__ = ('version', '_cap_datetime', '_cap_dnsname')
_identity = object()
def __init__(self, version, _identity=_identity):
super(_JSONPrimer, self).__init__()
self.version = version
self._cap_datetime = None
self._cap_dnsname = None
self.update({
unicode: _identity,
bool: _identity,
int: _identity,
type(None): _identity,
float: _identity,
Decimal: unicode,
DN: str,
Principal: unicode,
DNSName: self._enc_dnsname,
datetime.datetime: self._enc_datetime,
bytes: self._enc_bytes,
list: self._enc_list,
tuple: self._enc_list,
dict: self._enc_dict,
crypto_x509.Certificate: self._enc_certificate,
crypto_x509.CertificateSigningRequest: self._enc_certificate,
})
def __missing__(self, typ):
# walk MRO to find best match
for c in typ.__mro__:
if c in self:
self[typ] = self[c]
return self[c]
# use issubclass to check for registered ABCs
for c in self:
if issubclass(typ, c):
self[typ] = self[c]
return self[c]
raise TypeError(typ)
def convert(self, obj, _identity=_identity):
# obj.__class__ is twice as fast as type(obj)
func = self[obj.__class__]
return obj if func is _identity else func(obj)
def _enc_datetime(self, val):
cap = self._cap_datetime
if cap is None:
cap = capabilities.client_has_capability(self.version,
'datetime_values')
self._cap_datetime = cap
if cap:
return {'__datetime__': val.strftime(LDAP_GENERALIZED_TIME_FORMAT)}
else:
return val.strftime(LDAP_GENERALIZED_TIME_FORMAT)
def _enc_dnsname(self, val):
cap = self._cap_dnsname
if cap is None:
cap = capabilities.client_has_capability(self.version,
'dns_name_values')
self._cap_dnsname = cap
if cap:
return {'__dns_name__': unicode(val)}
else:
return unicode(val)
def _enc_bytes(self, val):
encoded = base64.b64encode(val)
if not six.PY2:
encoded = encoded.decode('ascii')
return {'__base64__': encoded}
def _enc_list(self, val, _identity=_identity):
result = []
append = result.append
for v in val:
func = self[v.__class__]
append(v if func is _identity else func(v))
return result
def _enc_dict(self, val, _identity=_identity, _iteritems=six.iteritems):
result = {}
for k, v in _iteritems(val):
func = self[v.__class__]
result[k] = v if func is _identity else func(v)
return result
def _enc_certificate(self, val):
return self._enc_bytes(val.public_bytes(x509_Encoding.DER))
def json_encode_binary(val, version, pretty_print=False):
"""Serialize a Python object structure to JSON
:param object val: Python object structure
:param str version: client version
:param bool pretty_print: indent and sort JSON (warning: slow!)
:return: text
:note: pretty printing triggers a slow path in Python's JSON module. Only
use pretty_print in debug mode.
"""
result = _JSONPrimer(version).convert(val)
if pretty_print:
return json.dumps(result, indent=4, sort_keys=True)
else:
return json.dumps(result)
def _ipa_obj_hook(dct, _iteritems=six.iteritems, _list=list):
"""JSON object hook
:see: _JSONPrimer
"""
if '__base64__' in dct:
return base64.b64decode(dct['__base64__'])
elif '__datetime__' in dct:
return datetime.datetime.strptime(dct['__datetime__'],
LDAP_GENERALIZED_TIME_FORMAT)
elif '__dns_name__' in dct:
return DNSName(dct['__dns_name__'])
else:
# XXX tests assume tuples. Is this really necessary?
for k, v in _iteritems(dct):
if v.__class__ is _list:
dct[k] = tuple(v)
return dct
def json_decode_binary(val):
"""Convert serialized JSON string back to Python data structure
:param val: JSON string
:type val: str, bytes
:return: Python data structure
:see: _ipa_obj_hook, _JSONPrimer
"""
if isinstance(val, bytes):
val = val.decode('utf-8')
return json.loads(val, object_hook=_ipa_obj_hook)