diff --git a/ipalib/plugins/group.py b/ipalib/plugins/group.py index f86b134e6..347a7ee9f 100644 --- a/ipalib/plugins/group.py +++ b/ipalib/plugins/group.py @@ -384,11 +384,12 @@ class group_add_member(LDAPAddMember): if domain_validator.is_trusted_sid_valid(sid): sids.append(sid) else: - actual_sid = domain_validator.get_sid_trusted_domain_object(sid) - if isinstance(actual_sid, unicode): - sids.append(actual_sid) + try: + actual_sid = domain_validator.get_trusted_domain_object_sid(sid) + except errors.PublicError, e: + failed_sids.append((sid, unicode(e))) else: - failed_sids.append((sid, 'Not a trusted domain SID')) + sids.append(actual_sid) if len(sids) == 0: raise errors.ValidationError(name=_('external member'), error=_('values are not recognized as valid SIDs from trusted domain')) diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py index 384044437..56d8b0319 100644 --- a/ipaserver/dcerpc.py +++ b/ipaserver/dcerpc.py @@ -164,16 +164,18 @@ class DomainValidator(object): except errors.NotFound, e: return [] - def is_trusted_sid_valid(self, sid): + def get_domain_by_sid(self, sid): if not self.domain: # our domain is not configured or self.is_configured() never run # reject SIDs as we can't check correctness of them - return False + raise errors.ValidationError(name='sid', + error=_('domain is not configured')) # Parse sid string to see if it is really in a SID format try: test_sid = security.dom_sid(sid) except TypeError, e: - return False + raise errors.ValidationError(name='sid', + error=_('SID is not valid')) # At this point we have SID_NT_AUTHORITY family SID and really need to # check it against prefixes of domain SIDs we trust to if not self._domains: @@ -181,7 +183,8 @@ class DomainValidator(object): if len(self._domains) == 0: # Our domain is configured but no trusted domains are configured # This means we can't check the correctness of a trusted domain SIDs - return False + raise errors.ValidationError(name='sid', + error=_('no trusted domain is configured')) # We have non-zero list of trusted domains and have to go through them # one by one and check their sids as prefixes test_sid_subauths = test_sid.sub_auths @@ -190,44 +193,87 @@ class DomainValidator(object): sub_auths = domsid.sub_auths num_auths = min(test_sid.num_auths, domsid.num_auths) if test_sid_subauths[:num_auths] == sub_auths[:num_auths]: - return True - return False + return domain + raise errors.NotFound(reason=_('SID does not match any trusted domain')) - def get_sid_trusted_domain_object(self, object_name): + def is_trusted_sid_valid(self, sid): + try: + self.get_domain_by_sid(sid) + except (errors.ValidationError, errors.NotFound): + return False + else: + return True + + def get_trusted_domain_objects(self, domain=None, flatname=None, filter="", + attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None): + """ + Search for LDAP objects in a trusted domain specified either by `domain' + or `flatname'. The actual LDAP search is specified by `filter', `attrs', + `scope' and `basedn'. When `basedn' is empty, database root DN is used. + """ + assert domain is not None or flatname is not None """Returns SID for the trusted domain object (user or group only)""" if not self.domain: # our domain is not configured or self.is_configured() never run - return None + raise errors.ValidationError(name=_('Trust setup'), + error=_('Our domain is not configured')) if not self._domains: self._domains = self.get_trusted_domains() if len(self._domains) == 0: # Our domain is configured but no trusted domains are configured - return None + raise errors.ValidationError(name=_('Trust setup'), + error=_('No trusted domain is not configured')) + entries = None + if domain is not None: + if domain not in self._domains: + raise errors.ValidationError(name=_('trusted domain object'), + error= _('domain is not trusted')) + # Now we have a name to check against our list of trusted domains + entries = self.search_in_gc(domain, filter, attrs, scope, basedn) + elif flatname is not None: + # Flatname was specified, traverse through the list of trusted + # domains first to find the proper one + found_flatname = False + for domain in self._domains: + if self._domains[domain][0] == flatname: + found_flatname = True + entries = self.search_in_gc(domain, filter, attrs, scope, basedn) + if entries: + break + if not found_flatname: + raise errors.ValidationError(name=_('trusted domain object'), + error= _('no trusted domain matched the specified flat name')) + if not entries: + raise errors.NotFound(reason=_('trusted domain object not found')) + + return entries + + def get_trusted_domain_object_sid(self, object_name): components = normalize_name(object_name) if not ('domain' in components or 'flatname' in components): # No domain or realm specified, ambiguous search - return False + raise errors.ValidationError(name=_('trusted domain object'), + error= _('Ambiguous search, user domain was not specified')) - entry = None - if 'domain' in components and components['domain'] in self._domains: - # Now we have a name to check against our list of trusted domains - entry = self.resolve_against_gc(components['domain'], components['name']) - elif 'flatname' in components: - # Flatname was specified, traverse through the list of trusted - # domains first to find the proper one - for domain in self._domains: - if self._domains[domain][0] == components['flatname']: - entry = self.resolve_against_gc(domain, components['name']) - if entry: - break - if entry: - try: - test_sid = security.dom_sid(entry) - return unicode(test_sid) - except TypeError, e: - return False - return False + attrs = ['objectSid'] + filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \ + % dict(name=components['name']) + scope = _ldap.SCOPE_SUBTREE + entries = self.get_trusted_domain_objects(components.get('domain'), + components.get('flatname'), filter, attrs, scope) + + if len(entries) > 1: + # Treat non-unique entries as invalid + raise errors.ValidationError(name=_('trusted domain object'), + error= _('Trusted domain did not return a unique object')) + sid = self.__sid_to_str(entries[0][1]['objectSid'][0]) + try: + test_sid = security.dom_sid(sid) + return unicode(test_sid) + except TypeError, e: + raise errors.ValidationError(name=_('trusted domain object'), + error= _('Trusted domain did not return a valid SID for the object')) def __sid_to_str(self, sid): """ @@ -280,36 +326,33 @@ class DomainValidator(object): dict(domain=info['dns_domain'],message=stderr.strip())) return (None, None) - def resolve_against_gc(self, domain, name): + def search_in_gc(self, domain, filter, attrs, scope, basedn=None): """ - Resolves `name' against trusted domain `domain' using Global Catalog - Returns SID of the `name' or None + Perform LDAP search in a trusted domain `domain' Global Catalog. + Returns resulting entries or None """ - entry = None + entries = None sid = None info = self.__retrieve_trusted_domain_gc_list(domain) if not info: - return None + raise errors.ValidationError(name=_('Trust setup'), + error=_('Cannot retrieve trusted domain GC list')) for (host, port) in info['gc']: - entry = self.__resolve_against_gc(info, host, port, name) - if entry: + entries = self.__search_in_gc(info, host, port, filter, attrs, scope, basedn) + if entries: break - if entry: - l = len(entry) - if l > 2: - # Treat non-unique entries as invalid - return None - sid = self.__sid_to_str(entry[0][1]['objectSid'][0]) - return sid + return entries - def __resolve_against_gc(self, info, host, port, name): + def __search_in_gc(self, info, host, port, filter, attrs, scope, basedn=None): """ - Actual resolution against LDAP server, using SASL GSSAPI authentication + Actual search in AD LDAP server, using SASL GSSAPI authentication Returns LDAP result or None """ conn = IPAdmin(host=host, port=port) auth = self.__extract_trusted_auth(info) + if attrs is None: + attrs = [] if auth: (ccache_name, principal) = self.__kinit_as_trusted_account(info, auth) if ccache_name: @@ -322,13 +365,15 @@ class DomainValidator(object): # records pointing back to the same host name conn.set_option(_ldap.OPT_X_SASL_NOCANON, _ldap.OPT_ON) conn.sasl_interactive_bind_s(None, sasl_auth) - base = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.'))) + if basedn is None: + # Use domain root base DN + basedn = DN(*map(lambda p: ('dc', p), info['dns_domain'].split('.'))) # We don't use conn.getEntry() because it will attempt to fetch schema from GC and that will fail - filterstr = conn.encode('(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' % dict(name=name)) - attrlist = conn.encode(['sAMAccountName', 'sAMAccountType', 'objectSid', 'groupType', 'description']) - entry = conn.conn.search_s(str(base), _ldap.SCOPE_SUBTREE, filterstr, attrlist, 0) + filterstr = conn.encode(filter) + attrlist = conn.encode(attrs) + entries = conn.conn.search_s(str(basedn), scope, filterstr, attrlist, 0) os.environ["KRB5CCNAME"] = old_ccache - return entry + return entries def __retrieve_trusted_domain_gc_list(self, domain): """