mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Move schema-related methods to LDAPConnection
Part of the work for: https://fedorahosted.org/freeipa/ticket/2660
This commit is contained in:
parent
5476b144f6
commit
44e15206d0
@ -36,7 +36,7 @@ from ldap.controls import LDAPControl
|
|||||||
from ldap.ldapobject import SimpleLDAPObject
|
from ldap.ldapobject import SimpleLDAPObject
|
||||||
import ldapurl
|
import ldapurl
|
||||||
|
|
||||||
from ipalib import errors
|
from ipalib import errors, _
|
||||||
from ipapython import ipautil
|
from ipapython import ipautil
|
||||||
from ipapython.ipautil import (
|
from ipapython.ipautil import (
|
||||||
format_netloc, wait_for_open_socket, wait_for_open_ports, CIDict)
|
format_netloc, wait_for_open_socket, wait_for_open_ports, CIDict)
|
||||||
@ -781,9 +781,14 @@ class LDAPConnection(object):
|
|||||||
This class is not intended to be used directly; instead, use one of its
|
This class is not intended to be used directly; instead, use one of its
|
||||||
subclasses, IPAdmin or the ldap2 plugin.
|
subclasses, IPAdmin or the ldap2 plugin.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, ldap_uri):
|
def __init__(self, ldap_uri):
|
||||||
self.ldap_uri = ldap_uri
|
self.ldap_uri = ldap_uri
|
||||||
self.log = log_mgr.get_logger(self)
|
self.log = log_mgr.get_logger(self)
|
||||||
|
self._init_connection()
|
||||||
|
|
||||||
|
def _init_connection(self):
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
def handle_errors(self, e, arg_desc=None):
|
def handle_errors(self, e, arg_desc=None):
|
||||||
"""Universal LDAPError handler
|
"""Universal LDAPError handler
|
||||||
@ -853,6 +858,50 @@ class LDAPConnection(object):
|
|||||||
self.log.info('Unhandled LDAPError: %s' % str(e))
|
self.log.info('Unhandled LDAPError: %s' % str(e))
|
||||||
raise errors.DatabaseError(desc=desc, info=info)
|
raise errors.DatabaseError(desc=desc, info=info)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def schema(self):
|
||||||
|
"""schema associated with this LDAP server"""
|
||||||
|
return self.conn.schema
|
||||||
|
|
||||||
|
def get_syntax(self, attr, value):
|
||||||
|
if self.schema is None:
|
||||||
|
return None
|
||||||
|
obj = self.schema.get_obj(_ldap.schema.AttributeType, attr)
|
||||||
|
if obj is not None:
|
||||||
|
return obj.syntax
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def has_dn_syntax(self, attr):
|
||||||
|
return self.conn.has_dn_syntax(attr)
|
||||||
|
|
||||||
|
def get_allowed_attributes(self, objectclasses, raise_on_unknown=False):
|
||||||
|
if self.schema is None:
|
||||||
|
return None
|
||||||
|
allowed_attributes = []
|
||||||
|
for oc in objectclasses:
|
||||||
|
obj = self.schema.get_obj(_ldap.schema.ObjectClass, oc)
|
||||||
|
if obj is not None:
|
||||||
|
allowed_attributes += obj.must + obj.may
|
||||||
|
elif raise_on_unknown:
|
||||||
|
raise errors.NotFound(
|
||||||
|
reason=_('objectclass %s not found') % oc)
|
||||||
|
return [unicode(a).lower() for a in list(set(allowed_attributes))]
|
||||||
|
|
||||||
|
def get_single_value(self, attr):
|
||||||
|
"""
|
||||||
|
Check the schema to see if the attribute is single-valued.
|
||||||
|
|
||||||
|
If the attribute is in the schema then returns True/False
|
||||||
|
|
||||||
|
If there is a problem loading the schema or the attribute is
|
||||||
|
not in the schema return None
|
||||||
|
"""
|
||||||
|
if self.schema is None:
|
||||||
|
return None
|
||||||
|
obj = self.schema.get_obj(_ldap.schema.AttributeType, attr)
|
||||||
|
return obj and obj.single_value
|
||||||
|
|
||||||
|
|
||||||
class IPAdmin(LDAPConnection):
|
class IPAdmin(LDAPConnection):
|
||||||
|
|
||||||
@ -1241,18 +1290,6 @@ class IPAdmin(LDAPConnection):
|
|||||||
else: break
|
else: break
|
||||||
return (done, exitCode)
|
return (done, exitCode)
|
||||||
|
|
||||||
def get_single_value(self, attr):
|
|
||||||
"""
|
|
||||||
Check the schema to see if the attribute is single-valued.
|
|
||||||
|
|
||||||
If the attribute is in the schema then returns True/False
|
|
||||||
|
|
||||||
If there is a problem loading the schema or the attribute is
|
|
||||||
not in the schema return None
|
|
||||||
"""
|
|
||||||
obj = self.schema.get_obj(ldap.schema.AttributeType, attr)
|
|
||||||
return obj and obj.single_value
|
|
||||||
|
|
||||||
def get_dns_sorted_by_length(self, entries, reverse=False):
|
def get_dns_sorted_by_length(self, entries, reverse=False):
|
||||||
"""
|
"""
|
||||||
Sorts a list of entries [(dn, entry_attrs)] based on their DN.
|
Sorts a list of entries [(dn, entry_attrs)] based on their DN.
|
||||||
|
@ -56,8 +56,6 @@ except ImportError:
|
|||||||
class GetEffectiveRightsControl(LDAPControl):
|
class GetEffectiveRightsControl(LDAPControl):
|
||||||
def __init__(self, criticality, authzId=None):
|
def __init__(self, criticality, authzId=None):
|
||||||
LDAPControl.__init__(self, '1.3.6.1.4.1.42.2.27.9.5.2', criticality, authzId)
|
LDAPControl.__init__(self, '1.3.6.1.4.1.42.2.27.9.5.2', criticality, authzId)
|
||||||
# for backward compatibility
|
|
||||||
from ipalib import _
|
|
||||||
|
|
||||||
from ipalib import api, errors
|
from ipalib import api, errors
|
||||||
from ipalib.crud import CrudBackend
|
from ipalib.crud import CrudBackend
|
||||||
@ -105,6 +103,11 @@ class ldap2(LDAPConnection, CrudBackend):
|
|||||||
except AttributeError:
|
except AttributeError:
|
||||||
self.base_dn = DN()
|
self.base_dn = DN()
|
||||||
|
|
||||||
|
def _init_connection(self):
|
||||||
|
# Connectible.conn is a proxy to thread-local storage;
|
||||||
|
# do not set it
|
||||||
|
pass
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if self.isconnected():
|
if self.isconnected():
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
@ -112,48 +115,6 @@ class ldap2(LDAPConnection, CrudBackend):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.ldap_uri
|
return self.ldap_uri
|
||||||
|
|
||||||
def _get_schema(self):
|
|
||||||
return self.conn.schema
|
|
||||||
schema = property(_get_schema, None, None, 'schema associated with this LDAP server')
|
|
||||||
|
|
||||||
def get_syntax(self, attr, value):
|
|
||||||
if self.schema is None:
|
|
||||||
return None
|
|
||||||
obj = self.schema.get_obj(_ldap.schema.AttributeType, attr)
|
|
||||||
if obj is not None:
|
|
||||||
return obj.syntax
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def has_dn_syntax(self, attr):
|
|
||||||
return self.conn.has_dn_syntax(attr)
|
|
||||||
|
|
||||||
def get_allowed_attributes(self, objectclasses, raise_on_unknown=False):
|
|
||||||
if self.schema is None:
|
|
||||||
return None
|
|
||||||
allowed_attributes = []
|
|
||||||
for oc in objectclasses:
|
|
||||||
obj = self.schema.get_obj(_ldap.schema.ObjectClass, oc)
|
|
||||||
if obj is not None:
|
|
||||||
allowed_attributes += obj.must + obj.may
|
|
||||||
elif raise_on_unknown:
|
|
||||||
raise errors.NotFound(reason=_('objectclass %s not found') % oc)
|
|
||||||
return [unicode(a).lower() for a in list(set(allowed_attributes))]
|
|
||||||
|
|
||||||
def get_single_value(self, attr):
|
|
||||||
"""
|
|
||||||
Check the schema to see if the attribute is single-valued.
|
|
||||||
|
|
||||||
If the attribute is in the schema then returns True/False
|
|
||||||
|
|
||||||
If there is a problem loading the schema or the attribute is
|
|
||||||
not in the schema return None
|
|
||||||
"""
|
|
||||||
if self.schema is None:
|
|
||||||
return None
|
|
||||||
obj = self.schema.get_obj(_ldap.schema.AttributeType, attr)
|
|
||||||
return obj and obj.single_value
|
|
||||||
|
|
||||||
def create_connection(self, ccache=None, bind_dn=None, bind_pw='',
|
def create_connection(self, ccache=None, bind_dn=None, bind_pw='',
|
||||||
tls_cacertfile=None, tls_certfile=None, tls_keyfile=None,
|
tls_cacertfile=None, tls_certfile=None, tls_keyfile=None,
|
||||||
debug_level=0, autobind=False):
|
debug_level=0, autobind=False):
|
||||||
|
Loading…
Reference in New Issue
Block a user