Added support for LDAP anonymous binding. Fixes #5650

This commit is contained in:
Khushboo Vashi
2020-07-20 15:30:06 +05:30
committed by Akshay Joshi
parent 3983e2c13c
commit 645517d22d
7 changed files with 173 additions and 93 deletions

View File

@@ -506,7 +506,6 @@ AUTHENTICATION_SOURCES = ['internal']
# automatically, if set to True.
# Set it to False, if user should not be added automatically,
# in this case Admin has to add the user manually in the SQLite database.
LDAP_AUTO_CREATE_USER = True
# Connection timeout
@@ -516,27 +515,47 @@ LDAP_CONNECTION_TIMEOUT = 10
# example: ldap://<ip-address>:<port> or ldap://<hostname>:<port>
LDAP_SERVER_URI = 'ldap://<ip-address>:<port>'
# The LDAP attribute containing user names. In OpenLDAP, this may be 'uid'
# whilst in AD, 'sAMAccountName' might be appropriate. (REQUIRED)
LDAP_USERNAME_ATTRIBUTE = '<User-id>'
##########################################################################
# 3 ways to configure LDAP as follows (Choose anyone):
# 1. Dedicated User binding
# LDAP Bind User DN Example: cn=username,dc=example,dc=com
# Set this parameter to allow the connection to bind using a dedicated user.
# After the connection is made, the pgadmin login user will be further
# authenticated by the username and password provided
# at the login screen.
LDAP_BIND_USER = None
# LDAP Bind User Password
LDAP_BIND_PASSWORD = None
# OR ####################
# 2. Anonymous Binding
# Set this parameter to allow the anonymous bind.
# After the connection is made, the pgadmin login user will be further
# authenticated by the username and password provided
LDAP_ANONYMOUS_BIND = False
# OR ####################
# 3. Bind as pgAdmin user
# BaseDN (REQUIRED)
# AD example:
# (&(objectClass=user)(memberof=CN=MYGROUP,CN=Users,dc=example,dc=com))
# OpenLDAP example: CN=Users,dc=example,dc=com
LDAP_BASE_DN = '<Base-DN>'
# The LDAP attribute containing user names. In OpenLDAP, this may be 'uid'
# whilst in AD, 'sAMAccountName' might be appropriate. (REQUIRED)
LDAP_USERNAME_ATTRIBUTE = '<User-id>'
##########################################################################
# LDAP Bind User DN Example: cn=username,dc=example,dc=com (OPTIONAL)
# Set this parameter to allow the connection to bind using a dedicated user.
# After the connection is made, the pgadmin login user will be further
# authenticated by the username and password provided
# at the login screen. (OPTIONAL)
LDAP_BIND_USER = None
# LDAP Bind User Password (OPTIONAL)
LDAP_BIND_PASSWORD = None
# Search ldap for further authentication
# Search ldap for further authentication (REQUIRED)
# It can be optional while bind as pgAdmin user
LDAP_SEARCH_BASE_DN = '<Search-Base-DN>'
# Filter string for the user search.

View File

@@ -11,7 +11,8 @@
import ssl
import config
from ldap3 import Connection, Server, Tls, ALL, ALL_ATTRIBUTES
from ldap3 import Connection, Server, Tls, ALL, ALL_ATTRIBUTES, ANONYMOUS,\
SIMPLE
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPBindError,\
LDAPInvalidScopeError, LDAPAttributeError, LDAPInvalidFilterError,\
LDAPStartTLSError, LDAPSSLConfigurationError
@@ -36,19 +37,24 @@ class LDAPAuthentication(BaseAuthentication):
def authenticate(self, form):
self.username = form.data['email']
self.password = form.data['password']
self.dedicated_user = True
self.start_tls = False
user_email = None
dedicated_user = True
# Check the dedicated ldap user
self.bind_user = getattr(config, 'LDAP_BIND_USER', None)
self.bind_pass = getattr(config, 'LDAP_BIND_PASSWORD', None)
# Check for the anonymous binding
self.anonymous_bind = getattr(config, 'LDAP_ANONYMOUS_BIND', False)
if self.bind_user and not self.bind_pass:
return False, "LDAP configuration error: Set the bind password."
# if no dedicated ldap user is configured then use the login
# username and password
if not self.bind_user or not self.bind_pass:
if not self.bind_user and not self.bind_pass and\
self.anonymous_bind is False:
user_dn = "{0}={1},{2}".format(config.LDAP_USERNAME_ATTRIBUTE,
self.username,
config.LDAP_BASE_DN
@@ -56,7 +62,7 @@ class LDAPAuthentication(BaseAuthentication):
self.bind_user = user_dn
self.bind_pass = self.password
dedicated_user = False
self.dedicated_user = False
# Connect ldap server
status, msg = self.connect()
@@ -70,10 +76,11 @@ class LDAPAuthentication(BaseAuthentication):
return status, ldap_user
# If dedicated user is configured
if dedicated_user:
if self.dedicated_user:
# Get the user DN from the user ldap entry
self.bind_user = ldap_user.entry_dn
self.bind_pass = self.password
self.anonymous_bind = False
status, msg = self.connect()
if not status:
@@ -87,61 +94,25 @@ class LDAPAuthentication(BaseAuthentication):
def connect(self):
"""Setup the connection to the LDAP server and authenticate the user.
"""
status, server = self._configure_server()
# Parse the server URI
uri = getattr(config, 'LDAP_SERVER_URI', None)
if uri:
uri = urlparse(uri)
# Create the TLS configuration object if required
tls = None
if type(uri) == str:
return False, "LDAP configuration error: Set the proper LDAP URI."
if uri.scheme == 'ldaps' or config.LDAP_USE_STARTTLS:
ca_cert_file = getattr(config, 'LDAP_CA_CERT_FILE', None)
cert_file = getattr(config, 'LDAP_CERT_FILE', None)
key_file = getattr(config, 'LDAP_KEY_FILE', None)
cert_validate = ssl.CERT_NONE
if ca_cert_file and cert_file and key_file:
cert_validate = ssl.CERT_REQUIRED
try:
tls = Tls(
local_private_key_file=key_file,
local_certificate_file=cert_file,
validate=cert_validate,
version=ssl.PROTOCOL_TLSv1_2,
ca_certs_file=ca_cert_file)
except LDAPSSLConfigurationError as e:
current_app.logger.exception(
"LDAP configuration error: {}\n".format(e))
return False, "LDAP configuration error: {}\n".format(
e.args[0])
try:
# Create the server object
server = Server(uri.hostname,
port=uri.port,
use_ssl=(uri.scheme == 'ldaps'),
get_info=ALL,
tls=tls,
connect_timeout=config.LDAP_CONNECTION_TIMEOUT)
except ValueError as e:
return False, "LDAP configuration error: {}.".format(e)
if not status:
return status, server
# Create the connection
try:
self.conn = Connection(server,
user=self.bind_user,
password=self.bind_pass,
auto_bind=True
)
if self.anonymous_bind:
self.conn = Connection(server,
auto_bind=True,
authentication=ANONYMOUS
)
else:
self.conn = Connection(server,
user=self.bind_user,
password=self.bind_pass,
auto_bind=True,
authentication=SIMPLE
)
except LDAPSocketOpenError as e:
current_app.logger.exception(
@@ -159,7 +130,7 @@ class LDAPAuthentication(BaseAuthentication):
" {}\n".format(e.args[0])
# Enable TLS if STARTTLS is configured
if not uri.scheme == 'ldaps' and config.LDAP_USE_STARTTLS:
if self.start_tls:
try:
self.conn.start_tls()
except LDAPStartTLSError as e:
@@ -185,14 +156,75 @@ class LDAPAuthentication(BaseAuthentication):
return True, None
def __configure_tls(self):
ca_cert_file = getattr(config, 'LDAP_CA_CERT_FILE', None)
cert_file = getattr(config, 'LDAP_CERT_FILE', None)
key_file = getattr(config, 'LDAP_KEY_FILE', None)
cert_validate = ssl.CERT_NONE
if ca_cert_file and cert_file and key_file:
cert_validate = ssl.CERT_REQUIRED
try:
tls = Tls(
local_private_key_file=key_file,
local_certificate_file=cert_file,
validate=cert_validate,
version=ssl.PROTOCOL_TLSv1_2,
ca_certs_file=ca_cert_file)
except LDAPSSLConfigurationError as e:
current_app.logger.exception(
"LDAP configuration error: {}\n".format(e))
return False, "LDAP configuration error: {}\n".format(
e.args[0])
return True, tls
def _configure_server(self):
# Parse the server URI
uri = getattr(config, 'LDAP_SERVER_URI', None)
if uri:
uri = urlparse(uri)
# Create the TLS configuration object if required
tls = None
if type(uri) == str:
return False, "LDAP configuration error: Set the proper LDAP URI."
if uri.scheme == 'ldaps' or config.LDAP_USE_STARTTLS:
status, tls = self.__configure_tls()
if not status:
return status, tls
if uri.scheme != 'ldaps' and config.LDAP_USE_STARTTLS:
self.start_tls = True
try:
# Create the server object
server = Server(uri.hostname,
port=uri.port,
use_ssl=(uri.scheme == 'ldaps'),
get_info=ALL,
tls=tls,
connect_timeout=config.LDAP_CONNECTION_TIMEOUT)
except ValueError as e:
return False, "LDAP configuration error: {}.".format(e)
return True, server
def search_ldap_user(self):
"""Get a list of users from the LDAP server based on config
search criteria."""
try:
search_base_dn = config.LDAP_SEARCH_BASE_DN
if search_base_dn is None or search_base_dn == '' or\
search_base_dn == '<Search-Base-DN>':
if (not search_base_dn or search_base_dn == '<Search-Base-DN>')\
and (self.anonymous_bind or self.dedicated_user):
return False, "LDAP configuration error:" \
" Set the Search Domain."
elif not search_base_dn or search_base_dn == '<Search-Base-DN>':
search_base_dn = config.LDAP_BASE_DN
self.conn.search(search_base=search_base_dn,
search_filter=config.LDAP_SEARCH_FILTER,
search_scope=config.LDAP_SEARCH_SCOPE,

View File

@@ -32,6 +32,9 @@ class LDAPLoginTestCase(BaseTestGenerator):
('LDAP With Dedicated User Authentication', dict(
config_key_param='ldap_with_dedicated_user',
is_gravtar_image_check=False)),
('LDAP With Anonymous Binding', dict(
config_key_param='ldap_with_anonymous_bind',
is_gravtar_image_check=False)),
]
@classmethod
@@ -66,6 +69,10 @@ class LDAPLoginTestCase(BaseTestGenerator):
ldap_config['bind_password'] != "":
app_config.LDAP_BIND_USER = ldap_config['bind_user']
app_config.LDAP_BIND_PASSWORD = ldap_config['bind_password']
if ldap_config['anonymous_bind'] != "" and\
ldap_config['anonymous_bind']:
app_config.LDAP_ANONYMOUS_BIND = True
else:
self.skipTest(
"LDAP config not set."

View File

@@ -53,6 +53,9 @@ class LDAPLoginMockTestCase(BaseTestGenerator):
def setUp(self):
app_config.AUTHENTICATION_SOURCES = self.auth_source
app_config.LDAP_AUTO_CREATE_USER = self.auto_create_user
app_config.LDAP_ANONYMOUS_BIND = False
app_config.LDAP_BIND_USER = None
app_config.LDAP_BIND_PASSWORD = None
@patch.object(AuthSourceRegistry.registry['ldap'], 'connect',
return_value=[True, "Done"])
@@ -60,7 +63,7 @@ class LDAPLoginMockTestCase(BaseTestGenerator):
return_value=[True, ''])
def runTest(self, conn_mock_obj, search_mock_obj):
"""This function checks ldap login functionality."""
AuthSourceRegistry.registry['ldap'].dedicated_user = False
res = self.tester.login(self.username, self.password, True)
respdata = 'Gravatar image for %s' % self.username
self.assertTrue(respdata in res.data.decode('utf8'))

View File

@@ -20,6 +20,7 @@
"ldap": {
"name": "Ldap scenario name",
"uri": "ldap://IP-ADDRESS/HOSTNAME:389",
"anonymous_bind": false,
"bind_user": "",
"bind_password": "",
"base_dn": "BASE-DN",
@@ -34,6 +35,7 @@
"ldap_with_ssl": {
"name": "Ldap scenario name",
"uri": "ldaps://IP-ADDRESS/HOSTNAME:636",
"anonymous_bind": false,
"bind_user": "",
"bind_password": "",
"base_dn": "BASE-DN",
@@ -48,9 +50,10 @@
"ldap_with_tls": {
"name": "Ldap scenario name",
"uri": "ldap://IP-ADDRESS/HOSTNAME:389",
"anonymous_bind": false,
"bind_user": "",
"bind_password": "",
"base_dn": "BASE-DN",
"base_dn": "",
"search_base_dn": "SEARCH-BASE-DN",
"username_atr": "UID",
"search_filter": "(objectclass=*)",
@@ -62,9 +65,10 @@
"ldap_with_dedicated_user": {
"name": "Ldap scenario name",
"uri": "ldap://IP-ADDRESS/HOSTNAME:389",
"anonymous_bind": true,
"bind_user": "",
"bind_password": "",
"base_dn": "BASE-DN",
"base_dn": "",
"search_base_dn": "SEARCH-BASE-DN",
"username_atr": "UID",
"search_filter": "(objectclass=*)",