freeipa/ipapython/dn_ctypes.py

166 lines
3.7 KiB
Python
Raw Normal View History

#
# Copyright (C) 2019 FreeIPA Contributors see COPYING for license
#
"""ctypes wrapper for libldap_str2dn
"""
from __future__ import absolute_import
import ctypes
import ctypes.util
import six
__all__ = ("str2dn", "dn2str", "DECODING_ERROR", "LDAPError")
# load reentrant ldap client library (libldap_r-*.so.2)
ldap_r_lib = ctypes.util.find_library("ldap_r-2")
if ldap_r_lib is None:
raise ImportError("libldap_r shared library missing")
try:
lib = ctypes.CDLL(ldap_r_lib)
except OSError as e:
raise ImportError(str(e))
# constants
LDAP_AVA_FREE_ATTR = 0x0010
LDAP_AVA_FREE_VALUE = 0x0020
LDAP_DECODING_ERROR = -4
# mask for AVA flags
AVA_MASK = ~(LDAP_AVA_FREE_ATTR | LDAP_AVA_FREE_VALUE)
class berval(ctypes.Structure):
__slots__ = ()
_fields_ = [("bv_len", ctypes.c_ulong), ("bv_value", ctypes.c_char_p)]
def __bytes__(self):
buf = ctypes.create_string_buffer(self.bv_value, self.bv_len)
return buf.raw
def __str__(self):
return self.__bytes__().decode("utf-8")
if six.PY2:
__unicode__ = __str__
__str__ = __bytes__
class LDAPAVA(ctypes.Structure):
__slots__ = ()
_fields_ = [
("la_attr", berval),
("la_value", berval),
("la_flags", ctypes.c_uint16),
]
# typedef LDAPAVA** LDAPRDN;
LDAPRDN = ctypes.POINTER(ctypes.POINTER(LDAPAVA))
# typedef LDAPRDN* LDAPDN;
LDAPDN = ctypes.POINTER(LDAPRDN)
def errcheck(result, func, arguments):
if result != 0:
if result == LDAP_DECODING_ERROR:
raise DECODING_ERROR
else:
msg = ldap_err2string(result)
raise LDAPError(msg.decode("utf-8"))
return result
ldap_str2dn = lib.ldap_str2dn
ldap_str2dn.argtypes = (
ctypes.c_char_p,
ctypes.POINTER(LDAPDN),
ctypes.c_uint16,
)
ldap_str2dn.restype = ctypes.c_int16
ldap_str2dn.errcheck = errcheck
ldap_dnfree = lib.ldap_dnfree
ldap_dnfree.argtypes = (LDAPDN,)
ldap_dnfree.restype = None
ldap_err2string = lib.ldap_err2string
ldap_err2string.argtypes = (ctypes.c_int16,)
ldap_err2string.restype = ctypes.c_char_p
class LDAPError(Exception):
pass
class DECODING_ERROR(LDAPError):
pass
# RFC 4514, 2.4
_ESCAPE_CHARS = {'"', "+", ",", ";", "<", ">", "'", "\x00"}
def _escape_dn(dn):
if not dn:
return ""
result = []
# a space or number sign occurring at the beginning of the string
if dn[0] in {"#", " "}:
result.append("\\")
for c in dn:
if c in _ESCAPE_CHARS:
result.append("\\")
result.append(c)
# a space character occurring at the end of the string
if len(dn) > 1 and result[-1] == " ":
# insert before last entry
result.insert(-1, "\\")
return "".join(result)
def dn2str(dn):
return ",".join(
"+".join(
"=".join((attr, _escape_dn(value))) for attr, value, _flag in rdn
)
for rdn in dn
)
def str2dn(dn, flags=0):
if dn is None:
return []
if isinstance(dn, six.text_type):
dn = dn.encode("utf-8")
ldapdn = LDAPDN()
try:
ldap_str2dn(dn, ctypes.byref(ldapdn), flags)
result = []
if not ldapdn:
# empty DN, str2dn("") == []
return result
for rdn in ldapdn:
if not rdn:
break
avas = []
for ava_p in rdn:
if not ava_p:
break
ava = ava_p[0]
avas.append(
(
six.text_type(ava.la_attr),
six.text_type(ava.la_value),
ava.la_flags & AVA_MASK,
)
)
result.append(avas)
return result
finally:
ldap_dnfree(ldapdn)