Use AD LDAP probing to create trusted domain ID range

When creating a trusted domain ID range, probe AD DC to get
information about ID space leveraged by POSIX users already
defined in AD, and create an ID range with according parameters.

For more details:
http://www.freeipa.org/page/V3/Use_posix_attributes_defined_in_AD
https://fedorahosted.org/freeipa/ticket/3649
This commit is contained in:
Tomas Babej
2013-07-17 15:55:36 +02:00
committed by Alexander Bokovoy
parent 84b2269589
commit 17c7d46c25
4 changed files with 231 additions and 53 deletions

View File

@@ -61,6 +61,7 @@ The code in this module relies heavily on samba4-python package
and Samba4 python bindings.
""")
def is_sid_valid(sid):
try:
security.dom_sid(sid)
@@ -69,6 +70,7 @@ def is_sid_valid(sid):
else:
return True
access_denied_error = errors.ACIError(info=_('CIFS server denied your credentials'))
dcerpc_error_codes = {
-1073741823:
@@ -113,6 +115,7 @@ class ExtendedDNControl(LDAPControl):
def encodeControlValue(self, value=None):
return '0\x03\x02\x01\x01'
class DomainValidator(object):
ATTR_FLATNAME = 'ipantflatname'
ATTR_SID = 'ipantsecurityidentifier'
@@ -184,6 +187,18 @@ class DomainValidator(object):
except errors.NotFound, e:
return []
def set_trusted_domains(self):
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
# This means we can't check the correctness of a trusted
# domain SIDs
raise errors.ValidationError(name='sid',
error=_('no trusted domain is configured'))
def get_domain_by_sid(self, sid, exact_match=False):
if not self.domain:
# our domain is not configured or self.is_configured() never run
@@ -200,14 +215,7 @@ class DomainValidator(object):
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
# This means we can't check the correctness of a trusted
# domain SIDs
raise errors.ValidationError(name='sid',
error=_('no trusted domain is configured'))
self.set_trusted_domains()
# We have non-zero list of trusted domains and have to go through
# them one by one and check their sids as prefixes / exact match
@@ -284,7 +292,7 @@ class DomainValidator(object):
raise errors.ValidationError(name=_('trusted domain object'),
error= _('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
elif flatname is not None:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
@@ -292,7 +300,7 @@ class DomainValidator(object):
for domain in self._domains:
if self._domains[domain][0] == flatname:
found_flatname = True
entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
if entries:
break
if not found_flatname:
@@ -436,48 +444,126 @@ class DomainValidator(object):
dict(domain=info['dns_domain'],message=stderr.strip()))
return (None, None)
def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
def kinit_as_http(self, domain):
"""
Perform LDAP search in a trusted domain `domain' Global Catalog.
Returns resulting entries or None
Initializes ccache with http service credentials.
Applies session code defaults for ccache directory and naming prefix.
Session code uses krbccache_prefix+<pid>, we use
krbccache_prefix+<TD>+<domain netbios name> so there is no clash.
Returns tuple (ccache path, principal) where (None, None) signifes an
error on ccache initialization
"""
domain_suffix = domain.replace('.', '-')
ccache_name = "%sTD%s" % (krbccache_prefix, domain_suffix)
ccache_path = os.path.join(krbccache_dir, ccache_name)
realm = api.env.realm
hostname = api.env.host
principal = 'HTTP/%s@%s' % (hostname, realm)
keytab = '/etc/httpd/conf/ipa.keytab'
# Destroy the contents of the ccache
root_logger.debug('Destroying the contents of the separate ccache')
(stdout, stderr, returncode) = ipautil.run(
['/usr/bin/kdestroy', '-A', '-c', ccache_path],
env={'KRB5CCNAME': ccache_path},
raiseonerr=False)
# Destroy the contents of the ccache
root_logger.debug('Running kinit from ipa.keytab to obtain HTTP '
'service principal with MS-PAC attached.')
(stdout, stderr, returncode) = ipautil.run(
['/usr/bin/kinit', '-kt', keytab, principal],
env={'KRB5CCNAME': ccache_path},
raiseonerr=False)
if returncode == 0:
return (ccache_path, principal)
else:
return (None, None)
def search_in_dc(self, domain, filter, attrs, scope, basedn=None,
use_http=False, quiet=False):
"""
Perform LDAP search in a trusted domain `domain' Domain Controller.
Returns resulting entries or None.
If use_http is set to True, the search is conducted using
HTTP service credentials.
"""
entries = None
sid = None
info = self.__retrieve_trusted_domain_gc_list(domain)
if not info:
raise errors.ValidationError(name=_('Trust setup'),
raise errors.ValidationError(
name=_('Trust setup'),
error=_('Cannot retrieve trusted domain GC list'))
for (host, port) in info['gc']:
entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
entries = self.__search_in_dc(info, host, port, filter, attrs,
scope, basedn=basedn,
use_http=use_http,
quiet=quiet)
if entries:
break
return entries
def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
def __search_in_dc(self, info, host, port, filter, attrs, scope,
basedn=None, use_http=False, quiet=False):
"""
Actual search in AD LDAP server, using SASL GSSAPI authentication
Returns LDAP result or None
Returns LDAP result or None.
"""
conn = IPAdmin(host=host, port=port, no_schema=True, decode_attrs=False)
auth = self.__extract_trusted_auth(info)
if attrs is None:
attrs = []
if auth:
(ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
if ccache_name:
old_ccache = os.environ.get('KRB5CCNAME')
os.environ["KRB5CCNAME"] = ccache_name
# OPT_X_SASL_NOCANON is used to avoid hard requirement for PTR
# records pointing back to the same host name
conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
conn.do_sasl_gssapi_bind()
if basedn is None:
# Use domain root base DN
basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
entries = conn.get_entries(basedn, scope, filter, attrs)
os.environ["KRB5CCNAME"] = old_ccache
return entries
if use_http:
(ccache_name, principal) = self.kinit_as_http(info['dns_domain'])
else:
auth = self.__extract_trusted_auth(info)
if not auth:
return None
(ccache_name, principal) = self.__kinit_as_trusted_account(info,
auth)
if ccache_name:
with installutils.private_ccache(path=ccache_name):
entries = None
try:
conn = IPAdmin(host=host,
port=389, # query the AD DC
no_schema=True,
decode_attrs=False,
sasl_nocanon=True)
# sasl_nocanon used to avoid hard requirement for PTR
# records pointing back to the same host name
conn.do_sasl_gssapi_bind()
if basedn is None:
# Use domain root base DN
basedn = ipautil.realm_to_suffix(info['dns_domain'])
entries = conn.get_entries(basedn, scope, filter, attrs)
except Exception, e:
msg = "Search on AD DC {host}:{port} failed with: {err}"\
.format(host=host, port=str(port), err=str(e))
if quiet:
root_logger.debug(msg)
else:
root_logger.warning(msg)
finally:
return entries
def __retrieve_trusted_domain_gc_list(self, domain):
"""
@@ -508,9 +594,13 @@ class DomainValidator(object):
except RuntimeError, e:
finddc_error = e
if not self._domains:
self._domains = self.get_trusted_domains()
info = dict()
info['auth'] = self._domains[domain][2]
servers = []
if result:
info['name'] = unicode(result.domain_name)
info['dns_domain'] = unicode(result.dns_domain)