mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-22 23:23:30 -06:00
193 lines
6.3 KiB
Python
193 lines
6.3 KiB
Python
|
#
|
||
|
# Copyright (C) 2024 FreeIPA Contributors see COPYING for license
|
||
|
#
|
||
|
|
||
|
import base64
|
||
|
from cryptography import x509 as crypto_x509
|
||
|
import datetime
|
||
|
from decimal import Decimal
|
||
|
import json
|
||
|
import six
|
||
|
from ipalib.constants import LDAP_GENERALIZED_TIME_FORMAT
|
||
|
from ipalib import capabilities
|
||
|
from ipalib.x509 import Encoding as x509_Encoding
|
||
|
from ipapython.dn import DN
|
||
|
from ipapython.dnsutil import DNSName
|
||
|
from ipapython.kerberos import Principal
|
||
|
|
||
|
if six.PY3:
|
||
|
unicode = str
|
||
|
|
||
|
|
||
|
class _JSONPrimer(dict):
|
||
|
"""Fast JSON primer and pre-converter
|
||
|
|
||
|
Prepare a data structure for JSON serialization. In an ideal world, priming
|
||
|
could be handled by the default hook of json.dumps(). Unfortunately the
|
||
|
hook treats Python 2 str as text while IPA considers str as bytes.
|
||
|
|
||
|
The primer uses a couple of tricks to archive maximum performance:
|
||
|
|
||
|
* O(1) type look instead of O(n) chain of costly isinstance() calls
|
||
|
* __missing__ and __mro__ with caching to handle subclasses
|
||
|
* inline code with minor code duplication (func lookup in enc_list/dict)
|
||
|
* avoid surplus function calls (e.g. func is _identity, obj.__class__
|
||
|
instead if type(obj))
|
||
|
* function default arguments to turn global into local lookups
|
||
|
* avoid re-creation of bound method objects (e.g. result.append)
|
||
|
* on-demand lookup of client capabilities with cached values
|
||
|
|
||
|
Depending on the client version number, the primer converts:
|
||
|
|
||
|
* bytes -> {'__base64__': b64encode}
|
||
|
* datetime -> {'__datetime__': LDAP_GENERALIZED_TIME}
|
||
|
* DNSName -> {'__dns_name__': unicode}
|
||
|
|
||
|
The _ipa_obj_hook() functions unserializes the marked JSON objects to
|
||
|
bytes, datetime and DNSName.
|
||
|
|
||
|
:see: _ipa_obj_hook
|
||
|
"""
|
||
|
__slots__ = ('version', '_cap_datetime', '_cap_dnsname')
|
||
|
|
||
|
_identity = object()
|
||
|
|
||
|
def __init__(self, version, _identity=_identity):
|
||
|
super(_JSONPrimer, self).__init__()
|
||
|
self.version = version
|
||
|
self._cap_datetime = None
|
||
|
self._cap_dnsname = None
|
||
|
self.update({
|
||
|
unicode: _identity,
|
||
|
bool: _identity,
|
||
|
int: _identity,
|
||
|
type(None): _identity,
|
||
|
float: _identity,
|
||
|
Decimal: unicode,
|
||
|
DN: str,
|
||
|
Principal: unicode,
|
||
|
DNSName: self._enc_dnsname,
|
||
|
datetime.datetime: self._enc_datetime,
|
||
|
bytes: self._enc_bytes,
|
||
|
list: self._enc_list,
|
||
|
tuple: self._enc_list,
|
||
|
dict: self._enc_dict,
|
||
|
crypto_x509.Certificate: self._enc_certificate,
|
||
|
crypto_x509.CertificateSigningRequest: self._enc_certificate,
|
||
|
})
|
||
|
|
||
|
def __missing__(self, typ):
|
||
|
# walk MRO to find best match
|
||
|
for c in typ.__mro__:
|
||
|
if c in self:
|
||
|
self[typ] = self[c]
|
||
|
return self[c]
|
||
|
# use issubclass to check for registered ABCs
|
||
|
for c in self:
|
||
|
if issubclass(typ, c):
|
||
|
self[typ] = self[c]
|
||
|
return self[c]
|
||
|
raise TypeError(typ)
|
||
|
|
||
|
def convert(self, obj, _identity=_identity):
|
||
|
# obj.__class__ is twice as fast as type(obj)
|
||
|
func = self[obj.__class__]
|
||
|
return obj if func is _identity else func(obj)
|
||
|
|
||
|
def _enc_datetime(self, val):
|
||
|
cap = self._cap_datetime
|
||
|
if cap is None:
|
||
|
cap = capabilities.client_has_capability(self.version,
|
||
|
'datetime_values')
|
||
|
self._cap_datetime = cap
|
||
|
if cap:
|
||
|
return {'__datetime__': val.strftime(LDAP_GENERALIZED_TIME_FORMAT)}
|
||
|
else:
|
||
|
return val.strftime(LDAP_GENERALIZED_TIME_FORMAT)
|
||
|
|
||
|
def _enc_dnsname(self, val):
|
||
|
cap = self._cap_dnsname
|
||
|
if cap is None:
|
||
|
cap = capabilities.client_has_capability(self.version,
|
||
|
'dns_name_values')
|
||
|
self._cap_dnsname = cap
|
||
|
if cap:
|
||
|
return {'__dns_name__': unicode(val)}
|
||
|
else:
|
||
|
return unicode(val)
|
||
|
|
||
|
def _enc_bytes(self, val):
|
||
|
encoded = base64.b64encode(val)
|
||
|
if not six.PY2:
|
||
|
encoded = encoded.decode('ascii')
|
||
|
return {'__base64__': encoded}
|
||
|
|
||
|
def _enc_list(self, val, _identity=_identity):
|
||
|
result = []
|
||
|
append = result.append
|
||
|
for v in val:
|
||
|
func = self[v.__class__]
|
||
|
append(v if func is _identity else func(v))
|
||
|
return result
|
||
|
|
||
|
def _enc_dict(self, val, _identity=_identity, _iteritems=six.iteritems):
|
||
|
result = {}
|
||
|
for k, v in _iteritems(val):
|
||
|
func = self[v.__class__]
|
||
|
result[k] = v if func is _identity else func(v)
|
||
|
return result
|
||
|
|
||
|
def _enc_certificate(self, val):
|
||
|
return self._enc_bytes(val.public_bytes(x509_Encoding.DER))
|
||
|
|
||
|
|
||
|
def json_encode_binary(val, version, pretty_print=False):
|
||
|
"""Serialize a Python object structure to JSON
|
||
|
|
||
|
:param object val: Python object structure
|
||
|
:param str version: client version
|
||
|
:param bool pretty_print: indent and sort JSON (warning: slow!)
|
||
|
:return: text
|
||
|
:note: pretty printing triggers a slow path in Python's JSON module. Only
|
||
|
use pretty_print in debug mode.
|
||
|
"""
|
||
|
result = _JSONPrimer(version).convert(val)
|
||
|
if pretty_print:
|
||
|
return json.dumps(result, indent=4, sort_keys=True)
|
||
|
else:
|
||
|
return json.dumps(result)
|
||
|
|
||
|
|
||
|
def _ipa_obj_hook(dct, _iteritems=six.iteritems, _list=list):
|
||
|
"""JSON object hook
|
||
|
|
||
|
:see: _JSONPrimer
|
||
|
"""
|
||
|
if '__base64__' in dct:
|
||
|
return base64.b64decode(dct['__base64__'])
|
||
|
elif '__datetime__' in dct:
|
||
|
return datetime.datetime.strptime(dct['__datetime__'],
|
||
|
LDAP_GENERALIZED_TIME_FORMAT)
|
||
|
elif '__dns_name__' in dct:
|
||
|
return DNSName(dct['__dns_name__'])
|
||
|
else:
|
||
|
# XXX tests assume tuples. Is this really necessary?
|
||
|
for k, v in _iteritems(dct):
|
||
|
if v.__class__ is _list:
|
||
|
dct[k] = tuple(v)
|
||
|
return dct
|
||
|
|
||
|
|
||
|
def json_decode_binary(val):
|
||
|
"""Convert serialized JSON string back to Python data structure
|
||
|
|
||
|
:param val: JSON string
|
||
|
:type val: str, bytes
|
||
|
:return: Python data structure
|
||
|
:see: _ipa_obj_hook, _JSONPrimer
|
||
|
"""
|
||
|
if isinstance(val, bytes):
|
||
|
val = val.decode('utf-8')
|
||
|
|
||
|
return json.loads(val, object_hook=_ipa_obj_hook)
|