Generalize AD GC search

Modify access methods to AD GC so that callers can specify a custom
basedn, filter, scope and attribute list, thus allowing it to perform
any LDAP search.

Error checking methodology in these functions was changed, so that it
rather raises an exception with a desription instead of simply returning
a None or False value which would made an investigation why something
does not work much more difficult. External membership method in
group-add-member command was updated to match this approach.

https://fedorahosted.org/freeipa/ticket/2997
This commit is contained in:
Martin Kosek 2013-01-18 17:28:39 +01:00
parent 4c4418fb9e
commit e60e80e2b6
2 changed files with 100 additions and 54 deletions

View File

@ -384,11 +384,12 @@ class group_add_member(LDAPAddMember):
if domain_validator.is_trusted_sid_valid(sid):
sids.append(sid)
else:
actual_sid = domain_validator.get_sid_trusted_domain_object(sid)
if isinstance(actual_sid, unicode):
sids.append(actual_sid)
try:
actual_sid = domain_validator.get_trusted_domain_object_sid(sid)
except errors.PublicError, e:
failed_sids.append((sid, unicode(e)))
else:
failed_sids.append((sid, 'Not a trusted domain SID'))
sids.append(actual_sid)
if len(sids) == 0:
raise errors.ValidationError(name=_('external member'),
error=_('values are not recognized as valid SIDs from trusted domain'))

View File

@ -164,16 +164,18 @@ class DomainValidator(object):
except errors.NotFound, e:
return []
def is_trusted_sid_valid(self, sid):
def get_domain_by_sid(self, sid):
if not self.domain:
# our domain is not configured or self.is_configured() never run
# reject SIDs as we can't check correctness of them
return False
raise errors.ValidationError(name='sid',
error=_('domain is not configured'))
# Parse sid string to see if it is really in a SID format
try:
test_sid = security.dom_sid(sid)
except TypeError, e:
return False
raise errors.ValidationError(name='sid',
error=_('SID is not valid'))
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
if not self._domains:
@ -181,7 +183,8 @@ class DomainValidator(object):
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
# This means we can't check the correctness of a trusted domain SIDs
return False
raise errors.ValidationError(name='sid',
error=_('no trusted domain is configured'))
# We have non-zero list of trusted domains and have to go through them
# one by one and check their sids as prefixes
test_sid_subauths = test_sid.sub_auths
@ -190,44 +193,87 @@ class DomainValidator(object):
sub_auths = domsid.sub_auths
num_auths = min(test_sid.num_auths, domsid.num_auths)
if test_sid_subauths[:num_auths] == sub_auths[:num_auths]:
return True
return False
return domain
raise errors.NotFound(reason=_('SID does not match any trusted domain'))
def get_sid_trusted_domain_object(self, object_name):
def is_trusted_sid_valid(self, sid):
try:
self.get_domain_by_sid(sid)
except (errors.ValidationError, errors.NotFound):
return False
else:
return True
def get_trusted_domain_objects(self, domain=None, flatname=None, filter="",
attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None):
"""
Search for LDAP objects in a trusted domain specified either by `domain'
or `flatname'. The actual LDAP search is specified by `filter', `attrs',
`scope' and `basedn'. When `basedn' is empty, database root DN is used.
"""
assert domain is not None or flatname is not None
"""Returns SID for the trusted domain object (user or group only)"""
if not self.domain:
# our domain is not configured or self.is_configured() never run
return None
raise errors.ValidationError(name=_('Trust setup'),
error=_('Our domain is not configured'))
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
return None
raise errors.ValidationError(name=_('Trust setup'),
error=_('No trusted domain is not configured'))
entries = None
if domain is not None:
if domain not in self._domains:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
elif flatname is not None:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
found_flatname = False
for domain in self._domains:
if self._domains[domain][0] == flatname:
found_flatname = True
entries = self.search_in_gc(domain, filter, attrs, scope, basedn)
if entries:
break
if not found_flatname:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('no trusted domain matched the specified flat name'))
if not entries:
raise errors.NotFound(reason=_('trusted domain object not found'))
return entries
def get_trusted_domain_object_sid(self, object_name):
components = normalize_name(object_name)
if not ('domain' in components or 'flatname' in components):
# No domain or realm specified, ambiguous search
return False
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Ambiguous search, user domain was not specified'))
entry = None
if 'domain' in components and components['domain'] in self._domains:
# Now we have a name to check against our list of trusted domains
entry = self.resolve_against_gc(components['domain'], components['name'])
elif 'flatname' in components:
# Flatname was specified, traverse through the list of trusted
# domains first to find the proper one
for domain in self._domains:
if self._domains[domain][0] == components['flatname']:
entry = self.resolve_against_gc(domain, components['name'])
if entry:
break
if entry:
try:
test_sid = security.dom_sid(entry)
return unicode(test_sid)
except TypeError, e:
return False
return False
attrs = ['objectSid']
filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \
% dict(name=components['name'])
scope = _ldap.SCOPE_SUBTREE
entries = self.get_trusted_domain_objects(components.get('domain'),
components.get('flatname'), filter, attrs, scope)
if len(entries) > 1:
# Treat non-unique entries as invalid
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Trusted domain did not return a unique object'))
sid = self.__sid_to_str(entries[0][1]['objectSid'][0])
try:
test_sid = security.dom_sid(sid)
return unicode(test_sid)
except TypeError, e:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Trusted domain did not return a valid SID for the object'))
def __sid_to_str(self, sid):
"""
@ -280,36 +326,33 @@ class DomainValidator(object):
dict(domain=info['dns_domain'],message=stderr.strip()))
return (None, None)
def resolve_against_gc(self, domain, name):
def search_in_gc(self, domain, filter, attrs, scope, basedn=None):
"""
Resolves `name' against trusted domain `domain' using Global Catalog
Returns SID of the `name' or None
Perform LDAP search in a trusted domain `domain' Global Catalog.
Returns resulting entries or None
"""
entry = None
entries = None
sid = None
info = self.__retrieve_trusted_domain_gc_list(domain)
if not info:
return None
raise errors.ValidationError(name=_('Trust setup'),
error=_('Cannot retrieve trusted domain GC list'))
for (host, port) in info['gc']:
entry = self.__resolve_against_gc(info, host, port, name)
if entry:
entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn)
if entries:
break
if entry:
l = len(entry)
if l > 2:
# Treat non-unique entries as invalid
return None
sid = self.__sid_to_str(entry[0][1]['objectSid'][0])
return sid
return entries
def __resolve_against_gc(self, info, host, port, name):
def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None):
"""
Actual resolution against LDAP server, using SASL GSSAPI authentication
Actual search in AD LDAP server, using SASL GSSAPI authentication
Returns LDAP result or None
"""
conn = IPAdmin(host=host, port=port)
auth = self.__extract_trusted_auth(info)
if attrs is None:
attrs = []
if auth:
(ccache_name, principal) = self.__kinit_as_trusted_account(info, auth)
if ccache_name:
@ -322,13 +365,15 @@ class DomainValidator(object):
# records pointing back to the same host name
conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON)
conn.sasl_interactive_bind_s(None, sasl_auth)
base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
if basedn is None:
# Use domain root base DN
basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.')))
# We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail
filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name))
attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description'])
entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0)
filterstr = conn.encode(filter)
attrlist = conn.encode(attrs)
entries = conn.conn.search_s(str(basedn), scope, filterstr, attrlist, 0)
os.environ["KRB5CCNAME"] = old_ccache
return entry
return entries
def __retrieve_trusted_domain_gc_list(self, domain):
"""