pgadmin4/web/pgadmin/authenticate/ldap.py
Cyril Jouve 5a253f9053 Change the following to replace Python 2 code with Python 3:
1) Replace the deprecated unit test method.
2) Wraps filter usage in a list call.
3) Converts the old metaclass syntax to new.
4) Use range instead of xrange method.
5) Change Unicode to str.
6) Several other transformations.
7) Fixed change password test cases.
8) Use simplejson instead of plain JSON.
2020-08-31 16:45:31 +05:30

264 lines
9.6 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""A blueprint module implementing the ldap authentication."""
import ssl
import config
from ldap3 import Connection, Server, Tls, ALL, ALL_ATTRIBUTES, ANONYMOUS,\
SIMPLE
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPBindError,\
LDAPInvalidScopeError, LDAPAttributeError, LDAPInvalidFilterError,\
LDAPStartTLSError, LDAPSSLConfigurationError
from flask_babelex import gettext
from urllib.parse import urlparse
from .internal import BaseAuthentication
from pgadmin.model import User, ServerGroup, db, Role
from flask import current_app
from pgadmin.tools.user_management import create_user
ERROR_SEARCHING_LDAP_DIRECTORY = "Error searching the LDAP directory: {}"
class LDAPAuthentication(BaseAuthentication):
"""Ldap Authentication Class"""
def get_friendly_name(self):
return gettext("ldap")
def authenticate(self, form):
self.username = form.data['email']
self.password = form.data['password']
self.dedicated_user = True
self.start_tls = False
user_email = None
# Check the dedicated ldap user
self.bind_user = getattr(config, 'LDAP_BIND_USER', None)
self.bind_pass = getattr(config, 'LDAP_BIND_PASSWORD', None)
# Check for the anonymous binding
self.anonymous_bind = getattr(config, 'LDAP_ANONYMOUS_BIND', False)
if self.bind_user and not self.bind_pass:
return False, "LDAP configuration error: Set the bind password."
# if no dedicated ldap user is configured then use the login
# username and password
if not self.bind_user and not self.bind_pass and\
self.anonymous_bind is False:
user_dn = "{0}={1},{2}".format(config.LDAP_USERNAME_ATTRIBUTE,
self.username,
config.LDAP_BASE_DN
)
self.bind_user = user_dn
self.bind_pass = self.password
self.dedicated_user = False
# Connect ldap server
status, msg = self.connect()
if not status:
return status, msg
status, ldap_user = self.search_ldap_user()
if not status:
return status, ldap_user
# If dedicated user is configured
if self.dedicated_user:
# Get the user DN from the user ldap entry
self.bind_user = ldap_user.entry_dn
self.bind_pass = self.password
self.anonymous_bind = False
status, msg = self.connect()
if not status:
return status, msg
if 'mail' in ldap_user:
user_email = ldap_user['mail'].value
return self.__auto_create_user(user_email)
def connect(self):
"""Setup the connection to the LDAP server and authenticate the user.
"""
status, server = self._configure_server()
if not status:
return status, server
# Create the connection
try:
if self.anonymous_bind:
self.conn = Connection(server,
auto_bind=True,
authentication=ANONYMOUS
)
else:
self.conn = Connection(server,
user=self.bind_user,
password=self.bind_pass,
auto_bind=True,
authentication=SIMPLE
)
except LDAPSocketOpenError as e:
current_app.logger.exception(
"Error connecting to the LDAP server: {}\n".format(e))
return False, "Error connecting to the LDAP server:" \
" {}\n".format(e.args[0])
except LDAPBindError as e:
current_app.logger.exception(
"Error binding to the LDAP server.")
return False, "Error binding to the LDAP server."
except Exception as e:
current_app.logger.exception(
"Error connecting to the LDAP server: {}\n".format(e))
return False, "Error connecting to the LDAP server:" \
" {}\n".format(e.args[0])
# Enable TLS if STARTTLS is configured
if self.start_tls:
try:
self.conn.start_tls()
except LDAPStartTLSError as e:
current_app.logger.exception(
"Error starting TLS: {}\n".format(e))
return False, "Error starting TLS: {}\n".format(e.args[0])
return True, None
def __auto_create_user(self, user_email):
"""Add the ldap user to the internal SQLite database."""
if config.LDAP_AUTO_CREATE_USER:
user = User.query.filter_by(
username=self.username).first()
if user is None:
return create_user({
'username': self.username,
'email': user_email,
'role': 2,
'active': True,
'auth_source': 'ldap'
})
return True, None
def __configure_tls(self):
ca_cert_file = getattr(config, 'LDAP_CA_CERT_FILE', None)
cert_file = getattr(config, 'LDAP_CERT_FILE', None)
key_file = getattr(config, 'LDAP_KEY_FILE', None)
cert_validate = ssl.CERT_NONE
if ca_cert_file and cert_file and key_file:
cert_validate = ssl.CERT_REQUIRED
try:
tls = Tls(
local_private_key_file=key_file,
local_certificate_file=cert_file,
validate=cert_validate,
version=ssl.PROTOCOL_TLSv1_2,
ca_certs_file=ca_cert_file)
except LDAPSSLConfigurationError as e:
current_app.logger.exception(
"LDAP configuration error: {}\n".format(e))
return False, "LDAP configuration error: {}\n".format(
e.args[0])
return True, tls
def _configure_server(self):
# Parse the server URI
uri = getattr(config, 'LDAP_SERVER_URI', None)
if uri:
uri = urlparse(uri)
# Create the TLS configuration object if required
tls = None
if isinstance(uri, str):
return False, "LDAP configuration error: Set the proper LDAP URI."
if uri.scheme == 'ldaps' or config.LDAP_USE_STARTTLS:
status, tls = self.__configure_tls()
if not status:
return status, tls
if uri.scheme != 'ldaps' and config.LDAP_USE_STARTTLS:
self.start_tls = True
try:
# Create the server object
server = Server(uri.hostname,
port=uri.port,
use_ssl=(uri.scheme == 'ldaps'),
get_info=ALL,
tls=tls,
connect_timeout=config.LDAP_CONNECTION_TIMEOUT)
except ValueError as e:
return False, "LDAP configuration error: {}.".format(e)
return True, server
def search_ldap_user(self):
"""Get a list of users from the LDAP server based on config
search criteria."""
try:
search_base_dn = config.LDAP_SEARCH_BASE_DN
if (not search_base_dn or search_base_dn == '<Search-Base-DN>')\
and (self.anonymous_bind or self.dedicated_user):
return False, "LDAP configuration error:" \
" Set the Search Domain."
elif not search_base_dn or search_base_dn == '<Search-Base-DN>':
search_base_dn = config.LDAP_BASE_DN
search_filter = "({0}={1})".format(config.LDAP_USERNAME_ATTRIBUTE,
self.username)
if config.LDAP_SEARCH_FILTER:
search_filter = "(&{0}{1})".format(search_filter,
config.LDAP_SEARCH_FILTER)
self.conn.search(search_base=search_base_dn,
search_filter=search_filter,
search_scope=config.LDAP_SEARCH_SCOPE,
attributes=ALL_ATTRIBUTES
)
except LDAPInvalidScopeError as e:
current_app.logger.exception(
ERROR_SEARCHING_LDAP_DIRECTORY.format(e.args[0])
)
return False, ERROR_SEARCHING_LDAP_DIRECTORY.format(e.args[0])
except LDAPAttributeError as e:
current_app.logger.exception(
ERROR_SEARCHING_LDAP_DIRECTORY.format(e)
)
return False, ERROR_SEARCHING_LDAP_DIRECTORY.format(e.args[0])
except LDAPInvalidFilterError as e:
current_app.logger.exception(
ERROR_SEARCHING_LDAP_DIRECTORY.format(e)
)
return False, ERROR_SEARCHING_LDAP_DIRECTORY.format(e.args[0])
results = len(self.conn.entries)
if results > 1:
return False, ERROR_SEARCHING_LDAP_DIRECTORY.format(
"More than one result found.")
elif results < 1:
return False, ERROR_SEARCHING_LDAP_DIRECTORY.format(
"Could not find the specified user.")
return True, self.conn.entries[0]