Files
openvino/.github/org_control/ldap_api.py
2021-06-04 13:28:19 +03:00

237 lines
9.1 KiB
Python

# Copyright (C) 2018-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""
Gets info about users and groups via LDAP
"""
# pylint: disable=fixme,no-member
from enum import Enum
from ldap3 import Server, Connection, ALL, SUBTREE
from configs import Config
class LdapApiException(Exception):
"""Base LDAP API exception"""
class InfoLevel(Enum):
"""Constants for printing user info from LDAP"""
PDL = 'PDL' # Public Distribution List (group of e-mail addresses)
FULL = 'Full'
def print_user_info(info, info_level=None):
"""Pretty-print of a user info data structure (dict). info_level is the InfoLevel Enum"""
if not info or not info.get('mail'):
raise LdapApiException('ERROR: No info or absent mail')
def get_membership():
if info_level == InfoLevel.PDL:
membership_info = ' PDLs:'
elif info_level == InfoLevel.FULL:
membership_info = ' memberOf :'
else:
return ''
# Grouping groups by purpose
if info_level == InfoLevel.PDL:
sort_key = lambda i: i.split(',', 1)[0].lower()
else:
sort_key = lambda i: i.split(',', 1)[1] + i.split(',', 1)[0].lower()
for item in sorted(info['memberOf'], key=sort_key):
if info_level == InfoLevel.PDL and 'OU=Delegated' not in item:
continue
membership_info += f'\n {item}'
return membership_info
try:
text_info = \
f'\n{info["cn"]} <{info["mail"]}>; {info["sAMAccountName"]}; {info["employeeID"]}' \
f'\n Org group: {info["intelSuperGroupDescr"]} ({info["intelSuperGroupShortName"]}) /'\
f' {info["intelGroupDescr"]} ({info["intelGroupShortName"]}) /' \
f' {info["intelDivisionDescr"]} ({info["intelDivisionShortName"]}) /' \
f' {info["intelOrgUnitDescr"]}' \
f'\n Manager: {info["manager"]}' \
f'\n Location: {info["intelRegionCode"]} / {info["co"]} / {info["intelSiteCode"]} /' \
f' {info["intelBldgCode"]} ({info["intelSiteName"]}) /' \
f' {info["physicalDeliveryOfficeName"]}' \
f'\n Other: {info["employeeType"]} | {info["intelExportCountryGroup"]} |' \
f' {info["whenCreated"]} | {info["intelCostCenterDescr"]} | {info["jobDescription"]}'
except Exception as exc:
raise LdapApiException(f'ERROR: Failed to get info about "{info["mail"]}". ' \
f'Exception occurred:\n{repr(exc)}') from exc
print(text_info)
membership = get_membership()
if info_level == InfoLevel.PDL and membership:
print(membership)
elif info_level == InfoLevel.FULL:
for key in sorted(info):
if isinstance(info[key], list):
if key == 'memberOf':
print(membership)
else:
print(f' {key} :')
for item in info[key]:
print(' ', item)
else:
print(f' {key} : {info[key]}')
class LdapApi:
"""LDAP API for getting user info and emails"""
_binary_blobs = ['thumbnailPhoto', 'msExchUMSpokenName', 'msExchBlockedSendersHash']
_check_existing = [
'intelExportCountryGroup',
'physicalDeliveryOfficeName',
'intelSuperGroupShortName',
'intelGroupShortName',
'intelDivisionShortName',
]
null = '<null>'
def __init__(self):
self._cfg = Config()
self.server = Server(self._cfg.LDAP_SERVER, get_info=ALL)
self.connection = Connection(self.server,
user=self._cfg.LDAP_USER,
password=self._cfg.LDAP_PASSWORD,
auto_bind=True)
self.connection.bind()
def get_user_emails(self, groups=None):
"""Gets emails of LDAP groups and sub-groups"""
print('\nGet emails from LDAP groups:')
processed_ldap_members = {}
def process_group_members(member, parent_group):
if member in processed_ldap_members:
processed_ldap_members[member]['parent_groups'].append(parent_group)
print('\nWARNING: Ignore LDAP member to avoid duplication and recursive cycling '
f'of PDLs: {member}\n '
f'email: {processed_ldap_members[member].get("email")}\n parent_groups:')
for group in processed_ldap_members[member].get('parent_groups', []):
print(7 * ' ', group)
return
processed_ldap_members[member] = {'email': None, 'parent_groups': [parent_group]}
# AD moves terminated users to the boneyard OU in case the user returns,
# so it can be reactivated with little effort.
# After 30 days it is removed and the unix personality becomes unlinked.
if 'OU=Boneyard' in member:
return
self.connection.search(member, r'(objectClass=*)', SUBTREE,
attributes=['cn', 'member', 'mail'])
#print(self.connection.entries)
if not self.connection.response:
raise LdapApiException(f'ERROR: empty response. LDAP member: {member}')
# Check that the member is worker.
# The response can contain several items, but the first item is valid only
if 'OU=Workers' in member:
if self.connection.response[0]['attributes']['mail']:
processed_ldap_members[member]['email'] = \
self.connection.response[0]['attributes']['mail'].lower()
return
raise LdapApiException(f'ERROR: no mail. LDAP worker: {member}\n'
f'{self.connection.entries}')
if len(self.connection.response) > 1:
raise LdapApiException(f'ERROR: multiple responses for {member}: '
f'{len(self.connection.response)}\n'
f'{self.connection.entries}')
if self.connection.response[0]['attributes']['member']:
for group_member in self.connection.response[0]['attributes']['member']:
process_group_members(group_member, member)
else:
print(f'\nERROR: no members in LDAP group: {member}\n{self.connection.entries}')
for group in groups or self._cfg.LDAP_PDLs:
print('\nProcess ROOT LDAP group:', group)
process_group_members(group, 'ROOT')
return {
member.get('email') for member in processed_ldap_members.values() if member.get('email')
}
def _get_user_info(self, query):
"""Gets user info from LDAP as dict matching key and values pairs from query"""
query_filter = ''.join(f'({key}={value})' for key, value in query.items())
for domain in self._cfg.LDAP_DOMAINS:
search_base = f'OU=Workers,DC={domain},DC=corp,DC=intel,DC=com'
self.connection.search(
search_base,
f'(&(objectcategory=person)(objectclass=user)(intelflags=1){query_filter})',
SUBTREE,
attributes=['*'])
if self.connection.response:
if len(self.connection.response) > 1:
raise LdapApiException(f'ERROR: multiple responses for {query_filter}: '
f'{len(self.connection.response)}\n'
f'{self.connection.entries}')
info = self.connection.response[0]['attributes']
# remove long binary blobs
for blob in LdapApi._binary_blobs:
info[blob] = b''
for key in LdapApi._check_existing:
if not info.get(key):
info[key] = LdapApi.null
return info
return {}
def get_user_info_by_idsid(self, idsid):
"""Gets user info from LDAP as dict using account name for searching"""
return self._get_user_info({'sAMAccountName': idsid})
def get_user_info_by_name(self, name):
"""Gets user info from LDAP as dict using common name for searching"""
return self._get_user_info({'cn': name})
def get_user_info_by_email(self, email):
"""Gets user info from LDAP as dict using emails for searching"""
return self._get_user_info({'mail': email})
def get_absent_emails(self, emails):
"""Checks users by email in LDAP and returns absent emails"""
absent_emails = set()
for email in emails:
if not self.get_user_info_by_email(email):
absent_emails.add(email)
return absent_emails
def _test():
"""Test and debug"""
ldap = LdapApi()
emails = ldap.get_user_emails()
print(f'\nLDAP emails count: {len(emails)}\n{"; ".join(emails)}')
emails = ['foo@intel.com']
for email in emails:
info = ldap.get_user_info_by_email(email)
if info:
print_user_info(info, InfoLevel.PDL)
else:
print(f'\n{email} - not found')
if __name__ == '__main__':
_test()