Unified ldap_initialize() function

Replace all ldap.initialize() calls with a helper function
ldap_initialize(). It handles cacert and cert validation correctly. It
also provides a unique place to handle python-ldap 3.0 bytes warnings in
the future.

Fixes: https://pagure.io/freeipa/issue/7411
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com>
This commit is contained in:
Christian Heimes
2018-02-15 12:30:06 +01:00
parent c701cd21d3
commit 1b0c55a3b3
7 changed files with 47 additions and 21 deletions

View File

@@ -85,6 +85,34 @@ if six.PY2 and hasattr(ldap, 'LDAPBytesWarning'):
)
def ldap_initialize(uri, cacertfile=None):
"""Wrapper around ldap.initialize()
"""
conn = ldap.initialize(uri)
if not uri.startswith('ldapi://'):
if cacertfile:
conn.set_option(ldap.OPT_X_TLS_CACERTFILE, cacertfile)
newctx = True
else:
newctx = False
req_cert = conn.get_option(ldap.OPT_X_TLS_REQUIRE_CERT)
if req_cert != ldap.OPT_X_TLS_DEMAND:
# libldap defaults to cert validation, but the default can be
# overridden in global or user local ldap.conf.
conn.set_option(
ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND
)
newctx = True
# reinitialize TLS context
if newctx:
conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
return conn
class _ServerSchema(object):
'''
Properties of a schema retrieved from an LDAP server.
@@ -1091,13 +1119,7 @@ class LDAPClient(object):
def _connect(self):
with self.error_handler():
conn = ldap.initialize(self.ldap_uri)
if self._start_tls or self._protocol == 'ldaps':
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self._cacert)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, True)
conn.set_option(ldap.OPT_X_TLS_DEMAND, True)
conn = ldap_initialize(self.ldap_uri, cacertfile=self._cacert)
if self._sasl_nocanon:
conn.set_option(ldap.OPT_X_SASL_NOCANON, ldap.OPT_ON)

View File

@@ -30,6 +30,7 @@ from ipalib import api, _
from ipalib import errors
from ipapython import ipautil
from ipapython.dn import DN
from ipapython.ipaldap import ldap_initialize
from ipaserver.install import installutils
from ipaserver.dcerpc_common import (TRUST_BIDIRECTIONAL,
TRUST_JOIN_EXTERNAL,
@@ -933,7 +934,7 @@ class TrustDomainInstance(object):
# We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID
# control to reveal the SID
ldap_uri = 'ldap://%s' % (result.pdc_dns_name)
conn = _ldap.initialize(ldap_uri)
conn = ldap_initialize(ldap_uri)
conn.set_option(_ldap.OPT_SERVER_CONTROLS, [ExtendedDNControl()])
search_result = None
try:

View File

@@ -36,6 +36,7 @@ from ipalib.text import _
from ipapython import ipautil, ipaldap, kerberos
from ipapython.admintool import ScriptError
from ipapython.dn import DN
from ipapython.ipaldap import ldap_initialize
from ipaplatform.paths import paths
from ipaserver.install import installutils
@@ -1077,13 +1078,10 @@ class ReplicationManager(object):
self.ad_suffix = ""
try:
# Validate AD connection
ad_conn = ldap.initialize('ldap://%s' % ipautil.format_netloc(ad_dc_name))
# the next one is to workaround bugs arounf opendalp libs+NSS db
# we need to first specify the OPT_X_TLS_CACERTFILE and _after_
# that initialize the context to prevent TLS connection errors:
# https://bugzilla.redhat.com/show_bug.cgi?id=800787
ad_conn.set_option(ldap.OPT_X_TLS_CACERTFILE, cacert)
ad_conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
ad_conn = ldap_initialize(
'ldap://%s' % ipautil.format_netloc(ad_dc_name),
cacertfile=cacert
)
ad_conn.start_tls_s()
ad_conn.simple_bind_s(str(ad_binddn), ad_pwd)
res = ad_conn.search_s("", ldap.SCOPE_BASE, '(objectClass=*)',

View File

@@ -4,6 +4,8 @@ import ldap
import ldap.sasl
import ldap.filter
from ipapython.ipaldap import ldap_initialize
class iSecLdap(object):
@@ -27,7 +29,7 @@ class iSecLdap(object):
return self._basedn
def connect(self):
conn = ldap.initialize(self.uri)
conn = ldap_initialize(self.uri)
if self.auth_type == 'EXTERNAL':
auth_tokens = ldap.sasl.external(None)
elif self.auth_type == 'GSSAPI':

View File

@@ -19,7 +19,6 @@
import unittest
import ldap
import pytest
from ipatests.test_ipaserver.httptest import Unauthorized_HTTP_test
@@ -27,6 +26,7 @@ from ipatests.test_xmlrpc.xmlrpc_test import XMLRPC_test
from ipatests.util import assert_equal
from ipalib import api, errors
from ipapython.dn import DN
from ipapython.ipaldap import ldap_initialize
testuser = u'tuser'
old_password = u'old_password'
@@ -60,7 +60,7 @@ class test_changepw(XMLRPC_test, Unauthorized_HTTP_test):
def _checkpw(self, user, password):
dn = str(DN(('uid', user), api.env.container_user, api.env.basedn))
conn = ldap.initialize(api.env.ldap_uri)
conn = ldap_initialize(api.env.ldap_uri)
try:
conn.simple_bind_s(dn, password)
finally:

View File

@@ -37,6 +37,7 @@ from ipatests.test_xmlrpc.xmlrpc_test import (
XMLRPC_test, fuzzy_digits, fuzzy_uuid, fuzzy_password,
Fuzzy, fuzzy_dergeneralizedtime, add_sid, add_oc, raises_exact)
from ipapython.dn import DN
from ipapython.ipaldap import ldap_initialize
from ipatests.test_xmlrpc.tracker.base import Tracker
from ipatests.test_xmlrpc.tracker.group_plugin import GroupTracker
@@ -913,8 +914,9 @@ class TestDeniedBindWithExpiredPrincipal(XMLRPC_test):
def setup_class(cls):
super(TestDeniedBindWithExpiredPrincipal, cls).setup_class()
cls.connection = ldap.initialize('ldap://{host}'
.format(host=api.env.host))
cls.connection = ldap_initialize(
'ldap://{host}'.format(host=api.env.host)
)
@classmethod
def teardown_class(cls):

View File

@@ -42,6 +42,7 @@ from ipalib import api
from ipalib.plugable import Plugin
from ipalib.request import context
from ipapython.dn import DN
from ipapython.ipaldap import ldap_initialize
from ipapython.ipautil import run
@@ -726,7 +727,7 @@ class DummyClass(object):
class MockLDAP(object):
def __init__(self):
self.connection = ldap.initialize(
self.connection = ldap_initialize(
'ldap://{host}'.format(host=ipalib.api.env.host)
)