diff --git a/ipaserver/dcerpc.py b/ipaserver/dcerpc.py index 21ac89dfd..19be6bf7a 100644 --- a/ipaserver/dcerpc.py +++ b/ipaserver/dcerpc.py @@ -44,10 +44,12 @@ import samba import random from cryptography.hazmat.primitives.ciphers import Cipher, algorithms from cryptography.hazmat.backends import default_backend +# pylint: disable=F0401 try: - from ldap.controls import RequestControl as LDAPControl #pylint: disable=F0401 + from ldap.controls import RequestControl as LDAPControl except ImportError: - from ldap.controls import LDAPControl as LDAPControl #pylint: disable=F0401 + from ldap.controls import LDAPControl as LDAPControl +# pylint: enable=F0401 import ldap as _ldap from ipapython.ipaldap import IPAdmin from ipaserver.session import krbccache_dir, krbccache_prefix @@ -74,7 +76,7 @@ and Samba4 python bindings. # Both constants can be used as masks against trust direction # because bi-directional has two lower bits set. -TRUST_ONEWAY = 1 +TRUST_ONEWAY = 1 TRUST_BIDIRECTIONAL = 3 # Trust join behavior @@ -91,31 +93,44 @@ def is_sid_valid(sid): return True -access_denied_error = errors.ACIError(info=_('CIFS server denied your credentials')) +access_denied_error = errors.ACIError( + info=_('CIFS server denied your credentials')) dcerpc_error_codes = { -1073741823: - errors.RemoteRetrieveError(reason=_('communication with CIFS server was unsuccessful')), + errors.RemoteRetrieveError( + reason=_('communication with CIFS server was unsuccessful')), -1073741790: access_denied_error, -1073741715: access_denied_error, -1073741614: access_denied_error, -1073741603: - errors.ValidationError(name=_('AD domain controller'), error=_('unsupported functional level')), - -1073741811: # NT_STATUS_INVALID_PARAMETER + errors.ValidationError( + name=_('AD domain controller'), + error=_('unsupported functional level')), + -1073741811: # NT_STATUS_INVALID_PARAMETER errors.RemoteRetrieveError( - reason=_('AD domain controller complains about communication sequence. It may mean unsynchronized time on both sides, for example')), - -1073741776: # NT_STATUS_INVALID_PARAMETER_MIX, we simply will skip the binding + reason=_('AD domain controller complains about communication ' + 'sequence. It may mean unsynchronized time on both ' + 'sides, for example')), + -1073741776: # NT_STATUS_INVALID_PARAMETER_MIX, + # we simply will skip the binding access_denied_error, - -1073741772: # NT_STATUS_OBJECT_NAME_NOT_FOUND - errors.RemoteRetrieveError(reason=_('CIFS server configuration does not allow access to \\\\pipe\\lsarpc')), + -1073741772: # NT_STATUS_OBJECT_NAME_NOT_FOUND + errors.RemoteRetrieveError( + reason=_('CIFS server configuration does not allow ' + 'access to \\\\pipe\\lsarpc')), } dcerpc_error_messages = { "NT_STATUS_OBJECT_NAME_NOT_FOUND": - errors.NotFound(reason=_('Cannot find specified domain or server name')), + errors.NotFound( + reason=_('Cannot find specified domain or server name')), "WERR_NO_LOGON_SERVERS": - errors.RemoteRetrieveError(reason=_('AD DC was unable to reach any IPA domain controller. Most likely it is a DNS or firewall issue')), + errors.RemoteRetrieveError( + reason=_('AD DC was unable to reach any IPA domain controller. ' + 'Most likely it is a DNS or firewall issue')), "NT_STATUS_INVALID_PARAMETER_MIX": - errors.RequirementError(name=_('At least the domain or IP address should be specified')), + errors.RequirementError( + name=_('At least the domain or IP address should be specified')), } pysss_type_key_translation_dict = { @@ -126,7 +141,7 @@ pysss_type_key_translation_dict = { } -def assess_dcerpc_exception(num=None,message=None): +def assess_dcerpc_exception(num=None, message=None): """ Takes error returned by Samba bindings and converts it into an IPA error class. @@ -135,8 +150,9 @@ def assess_dcerpc_exception(num=None,message=None): return dcerpc_error_codes[num] if message and message in dcerpc_error_messages: return dcerpc_error_messages[message] - reason = _('''CIFS server communication error: code "%(num)s", - message "%(message)s" (both may be "None")''') % dict(num=num, message=message) + reason = _('CIFS server communication error: code "%(num)s", ' + 'message "%(message)s" (both may be "None")') % \ + dict(num=num, message=message) return errors.RemoteRetrieveError(reason=reason) @@ -182,9 +198,13 @@ class DomainValidator(object): self._parm = None def is_configured(self): - cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn) + cn_trust_local = DN(('cn', self.api.env.domain), + self.api.env.container_cifsdomains, + self.api.env.basedn) try: - entry_attrs = self.ldap.get_entry(cn_trust_local, [self.ATTR_FLATNAME, self.ATTR_SID]) + entry_attrs = self.ldap.get_entry(cn_trust_local, + [self.ATTR_FLATNAME, + self.ATTR_SID]) self.flatname = entry_attrs[self.ATTR_FLATNAME][0] self.sid = entry_attrs[self.ATTR_SID][0] self.dn = entry_attrs.dn @@ -203,7 +223,8 @@ class DomainValidator(object): try: search_kw = {'objectClass': 'ipaNTTrustedDomain'} - filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL) + filter = self.ldap.make_filter(search_kw, + rules=self.ldap.MATCH_ALL) (entries, truncated) = self.ldap.find_entries( filter=filter, base_dn=cn_trust, @@ -216,22 +237,22 @@ class DomainValidator(object): # domain names as keys and those are generally case-insensitive result = ipautil.CIDict() - for entry in entries: + for e in entries: try: - trust_partner = entry[self.ATTR_TRUST_PARTNER][0] - flatname_normalized = entry[self.ATTR_FLATNAME][0].lower() - trusted_sid = entry[self.ATTR_TRUSTED_SID][0] - except KeyError as e: + t_partner = e.single_value.get(self.ATTR_TRUST_PARTNER) + fname_norm = e.single_value.get(self.ATTR_FLATNAME).lower() + trusted_sid = e.single_value.get(self.ATTR_TRUSTED_SID) + except KeyError as exc: # Some piece of trusted domain info in LDAP is missing # Skip the domain, but leave log entry for investigation api.log.warning("Trusted domain '%s' entry misses an " - "attribute: %s", entry.dn, e) + "attribute: %s", e.dn, exc) continue - result[trust_partner] = (flatname_normalized, - security.dom_sid(trusted_sid)) + result[t_partner] = (fname_norm, + security.dom_sid(trusted_sid)) return result - except errors.NotFound as e: + except errors.NotFound as exc: return [] def set_trusted_domains(self): @@ -244,21 +265,22 @@ class DomainValidator(object): # This means we can't check the correctness of a trusted # domain SIDs raise errors.ValidationError(name='sid', - error=_('no trusted domain is configured')) + error=_('no trusted domain ' + 'is configured')) def get_domain_by_sid(self, sid, exact_match=False): if not self.domain: # our domain is not configured or self.is_configured() never run # reject SIDs as we can't check correctness of them raise errors.ValidationError(name='sid', - error=_('domain is not configured')) + error=_('domain is not configured')) # Parse sid string to see if it is really in a SID format try: test_sid = security.dom_sid(sid) except TypeError: raise errors.ValidationError(name='sid', - error=_('SID is not valid')) + error=_('SID is not valid')) # At this point we have SID_NT_AUTHORITY family SID and really need to # check it against prefixes of domain SIDs we trust to @@ -314,30 +336,34 @@ class DomainValidator(object): return None def get_trusted_domain_objects(self, domain=None, flatname=None, filter="", - attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None): + attrs=None, scope=_ldap.SCOPE_SUBTREE, + basedn=None): """ - Search for LDAP objects in a trusted domain specified either by `domain' - or `flatname'. The actual LDAP search is specified by `filter', `attrs', - `scope' and `basedn'. When `basedn' is empty, database root DN is used. + Search for LDAP objects in a trusted domain specified either by + `domain' or `flatname'. The actual LDAP search is specified by + `filter', `attrs', `scope' and `basedn'. When `basedn' is empty, + database root DN is used. """ assert domain is not None or flatname is not None """Returns SID for the trusted domain object (user or group only)""" if not self.domain: # our domain is not configured or self.is_configured() never run raise errors.ValidationError(name=_('Trust setup'), - error=_('Our domain is not configured')) + error=_('Our domain is ' + 'not configured')) if not self._domains: self._domains = self.get_trusted_domains() if len(self._domains) == 0: # Our domain is configured but no trusted domains are configured raise errors.ValidationError(name=_('Trust setup'), - error=_('No trusted domain is not configured')) + error=_('No trusted domain is ' + 'not configured')) entries = None if domain is not None: if domain not in self._domains: raise errors.ValidationError(name=_('trusted domain object'), - error= _('domain is not trusted')) + error=_('domain is not trusted')) # Now we have a name to check against our list of trusted domains entries = self.search_in_dc(domain, filter, attrs, scope, basedn) elif flatname is not None: @@ -347,53 +373,65 @@ class DomainValidator(object): for domain in self._domains: if self._domains[domain][0] == flatname: found_flatname = True - entries = self.search_in_dc(domain, filter, attrs, scope, basedn) + entries = self.search_in_dc(domain, filter, + attrs, scope, basedn) if entries: break if not found_flatname: raise errors.ValidationError(name=_('trusted domain object'), - error= _('no trusted domain matched the specified flat name')) + error=_('no trusted domain ' + 'matched the specified ' + 'flat name')) if not entries: raise errors.NotFound(reason=_('trusted domain object not found')) return entries - def get_trusted_domain_object_sid(self, object_name, fallback_to_ldap=True): + def get_trusted_domain_object_sid(self, object_name, + fallback_to_ldap=True): result = pysss_nss_idmap.getsidbyname(object_name) - if object_name in result and (pysss_nss_idmap.SID_KEY in result[object_name]): + if object_name in result and \ + (pysss_nss_idmap.SID_KEY in result[object_name]): object_sid = result[object_name][pysss_nss_idmap.SID_KEY] return object_sid # If fallback to AD DC LDAP is not allowed, bail out if not fallback_to_ldap: raise errors.ValidationError(name=_('trusted domain object'), - error= _('SSSD was unable to resolve the object to a valid SID')) + error=_('SSSD was unable to resolve ' + 'the object to a valid SID')) # Else, we are going to contact AD DC LDAP components = normalize_name(object_name) if not ('domain' in components or 'flatname' in components): # No domain or realm specified, ambiguous search - raise errors.ValidationError(name=_('trusted domain object'), - error= _('Ambiguous search, user domain was not specified')) + raise errors.ValidationError(name=_('trusted domain object'), + error=_('Ambiguous search, user ' + 'domain was not specified')) attrs = ['objectSid'] - filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \ - % dict(name=components['name']) + filter = '(&(sAMAccountName=%(name)s)' \ + '(|(objectClass=user)(objectClass=group)))' \ + % dict(name=components['name']) scope = _ldap.SCOPE_SUBTREE entries = self.get_trusted_domain_objects(components.get('domain'), - components.get('flatname'), filter, attrs, scope) + components.get('flatname'), + filter, attrs, scope) if len(entries) > 1: # Treat non-unique entries as invalid raise errors.ValidationError(name=_('trusted domain object'), - error= _('Trusted domain did not return a unique object')) + error=_('Trusted domain did not ' + 'return a unique object')) sid = self.__sid_to_str(entries[0]['objectSid'][0]) try: test_sid = security.dom_sid(sid) return unicode(test_sid) except TypeError as e: raise errors.ValidationError(name=_('trusted domain object'), - error= _('Trusted domain did not return a valid SID for the object')) + error=_('Trusted domain did not ' + 'return a valid SID for ' + 'the object')) def get_trusted_domain_object_type(self, name_or_sid): """ @@ -443,7 +481,8 @@ class DomainValidator(object): ) attrs = ['sAMAccountName'] - filter = (r'(&(objectSid=%(sid)s)(|(objectClass=user)(objectClass=group)))' + filter = (r'(&(objectSid=%(sid)s)' + '(|(objectClass=user)(objectClass=group)))' % dict(sid=escaped_sid)) # sid in binary domain = self.get_domain_by_sid(sid) @@ -454,7 +493,8 @@ class DomainValidator(object): if len(entries) > 1: # Treat non-unique entries as invalid raise errors.ValidationError(name=_('trusted domain object'), - error=_('Trusted domain did not return a unique object')) + error=_('Trusted domain did not ' + 'return a unique object')) object_name = ( "%s@%s" % (entries[0].single_value['sAMAccountName'].lower(), @@ -486,27 +526,31 @@ class DomainValidator(object): # Now search a trusted domain for a user with this SID attrs = ['cn'] filter = '(&(objectClass=user)(objectSid=%(sid)s))' \ - % dict(sid=object_name) + % dict(sid=object_name) try: - entries = self.get_trusted_domain_objects(domain=domain, filter=filter, - attrs=attrs, scope=_ldap.SCOPE_SUBTREE) + entries = self.get_trusted_domain_objects(domain=domain, + filter=filter, + attrs=attrs, + scope=_ldap.SCOPE_SUBTREE) except errors.NotFound: raise errors.NotFound(reason=_('trusted domain user not found')) user_dn = entries[0].dn elif domain or flatname: attrs = ['cn'] filter = '(&(sAMAccountName=%(name)s)(objectClass=user))' \ - % dict(name=name) + % dict(name=name) try: entries = self.get_trusted_domain_objects(domain, - flatname, filter, attrs, _ldap.SCOPE_SUBTREE) + flatname, filter, attrs, + _ldap.SCOPE_SUBTREE) except errors.NotFound: raise errors.NotFound(reason=_('trusted domain user not found')) user_dn = entries[0].dn else: # No domain or realm specified, ambiguous search raise errors.ValidationError(name=_('trusted domain object'), - error= _('Ambiguous search, user domain was not specified')) + error=_('Ambiguous search, ' + 'user domain was not specified')) # Get SIDs of user object and it's groups # tokenGroups attribute must be read with a scope BASE for a known user @@ -514,9 +558,11 @@ class DomainValidator(object): attrs = ['objectSID', 'tokenGroups'] filter = "(objectClass=user)" entries = self.get_trusted_domain_objects(domain, - flatname, filter, attrs, _ldap.SCOPE_BASE, user_dn) + flatname, filter, attrs, + _ldap.SCOPE_BASE, user_dn) object_sid = self.__sid_to_str(entries[0]['objectSid'][0]) - group_sids = [self.__sid_to_str(sid) for sid in entries[0]['tokenGroups']] + group_sids = [self.__sid_to_str(sid) + for sid in entries[0]['tokenGroups']] return (object_sid, group_sids) def get_trusted_domain_user_and_groups(self, object_name): @@ -540,11 +586,14 @@ class DomainValidator(object): if is_valid_sid: object_sid = object_name result = pysss_nss_idmap.getnamebysid(object_name) - if object_name in result and (pysss_nss_idmap.NAME_KEY in result[object_name]): - group_list = pysss.getgrouplist(result[object_name][pysss_nss_idmap.NAME_KEY]) + if object_name in result and \ + (pysss_nss_idmap.NAME_KEY in result[object_name]): + group_list = pysss.getgrouplist( + result[object_name][pysss_nss_idmap.NAME_KEY]) else: result = pysss_nss_idmap.getsidbyname(object_name) - if object_name in result and (pysss_nss_idmap.SID_KEY in result[object_name]): + if object_name in result and \ + (pysss_nss_idmap.SID_KEY in result[object_name]): object_sid = result[object_name][pysss_nss_idmap.SID_KEY] group_list = pysss.getgrouplist(object_name) @@ -552,7 +601,10 @@ class DomainValidator(object): return self.__get_trusted_domain_user_and_groups(object_name) group_sids = pysss_nss_idmap.getsidbyname(group_list) - return (object_sid, [el[1][pysss_nss_idmap.SID_KEY] for el in group_sids.items()]) + return ( + object_sid, + [el[1][pysss_nss_idmap.SID_KEY] for el in group_sids.items()] + ) def __sid_to_str(self, sid): """ @@ -561,12 +613,13 @@ class DomainValidator(object): """ sid_rev_num = ord(sid[0]) number_sub_id = ord(sid[1]) - ia = struct.unpack('!Q','\x00\x00'+sid[2:8])[0] + ia = struct.unpack('!Q', '\x00\x00'+sid[2:8])[0] subs = [ - struct.unpack(' 0: self.parm.set('netbios name', hostname) self.creds = creds @@ -810,14 +867,14 @@ class TrustDomainInstance(object): self.validation_attempts = 0 def __gen_lsa_connection(self, binding): - if self.creds is None: - raise errors.RequirementError(name=_('CIFS credentials object')) - try: - result = lsa.lsarpc(binding, self.parm, self.creds) - return result - except RuntimeError as e: - num, message = e.args # pylint: disable=unpacking-non-sequence - raise assess_dcerpc_exception(num=num, message=message) + if self.creds is None: + raise errors.RequirementError(name=_('CIFS credentials object')) + try: + result = lsa.lsarpc(binding, self.parm, self.creds) + return result + except RuntimeError as e: + num, message = e.args # pylint: disable=unpacking-non-sequence + raise assess_dcerpc_exception(num=num, message=message) def init_lsa_pipe(self, remote_host): """ @@ -847,30 +904,35 @@ class TrustDomainInstance(object): # When session key is not available, we just skip this binding session_attempts = session_attempts + 1 - if self._pipe is None and (attempts + session_attempts) == len(bindings): + if self._pipe is None and \ + (attempts + session_attempts) == len(bindings): raise errors.ACIError( - info=_('CIFS server %(host)s denied your credentials') % dict(host=remote_host)) + info=_('CIFS server %(host)s denied your credentials') + % dict(host=remote_host)) if self._pipe is None: raise errors.RemoteRetrieveError( - reason=_('Cannot establish LSA connection to %(host)s. Is CIFS server running?') % dict(host=remote_host)) + reason=_('Cannot establish LSA connection to %(host)s. ' + 'Is CIFS server running?') % dict(host=remote_host)) self.binding = binding self.session_key = self._pipe.session_key def __gen_lsa_bindings(self, remote_host): """ - There are multiple transports to issue LSA calls. However, depending on a - system in use they may be blocked by local operating system policies. + There are multiple transports to issue LSA calls. However, depending on + a system in use they may be blocked by local operating system policies. Generate all we can use. init_lsa_pipe() will try them one by one until there is one working. - We try NCACN_NP before NCACN_IP_TCP and use SMB2 before SMB1 or defaults. + We try NCACN_NP before NCACN_IP_TCP and use SMB2 before SMB1. """ transports = (u'ncacn_np', u'ncacn_ip_tcp') - options = ( u'smb2,print', u'print') - return [u'%s:%s[%s]' % (t, remote_host, o) for t in transports for o in options] + options = (u'smb2,print', u'print') + return [u'%s:%s[%s]' % (t, remote_host, o) + for t in transports for o in options] - def retrieve_anonymously(self, remote_host, discover_srv=False, search_pdc=False): + def retrieve_anonymously(self, remote_host, + discover_srv=False, search_pdc=False): """ When retrieving DC information anonymously, we can't get SID of the domain """ @@ -896,7 +958,8 @@ class TrustDomainInstance(object): self.info['is_pdc'] = (result.server_type & nbt.NBT_SERVER_PDC) != 0 # Netlogon response doesn't contain SID of the domain. - # We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID control to reveal the SID + # We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID + # control to reveal the SID ldap_uri = 'ldap://%s' % (result.pdc_dns_name) conn = _ldap.initialize(ldap_uri) conn.set_option(_ldap.OPT_SERVER_CONTROLS, [ExtendedDNControl()]) @@ -908,7 +971,7 @@ class TrustDomainInstance(object): except _ldap.LDAPError as e: root_logger.error( "LDAP error when connecting to %(host)s: %(error)s" % - dict(host=unicode(result.pdc_name), error=str(e))) + dict(host=unicode(result.pdc_name), error=str(e))) except KeyError as e: root_logger.error("KeyError: {err}, LDAP entry from {host} " "returned malformed. Your DNS might be " @@ -930,8 +993,11 @@ class TrustDomainInstance(object): objectAttribute = lsa.ObjectAttribute() objectAttribute.sec_qos = lsa.QosInfo() try: - self._policy_handle = self._pipe.OpenPolicy2(u"", objectAttribute, security.SEC_FLAG_MAXIMUM_ALLOWED) - result = self._pipe.QueryInfoPolicy2(self._policy_handle, lsa.LSA_POLICY_INFO_DNS) + self._policy_handle = \ + self._pipe.OpenPolicy2(u"", objectAttribute, + security.SEC_FLAG_MAXIMUM_ALLOWED) + result = self._pipe.QueryInfoPolicy2(self._policy_handle, + lsa.LSA_POLICY_INFO_DNS) except RuntimeError as e: num, message = e.args # pylint: disable=unpacking-non-sequence raise assess_dcerpc_exception(num=num, message=message) @@ -944,7 +1010,8 @@ class TrustDomainInstance(object): self.info['dc'] = remote_host try: - result = self._pipe.QueryInfoPolicy2(self._policy_handle, lsa.LSA_POLICY_INFO_ROLE) + result = self._pipe.QueryInfoPolicy2(self._policy_handle, + lsa.LSA_POLICY_INFO_ROLE) except RuntimeError as e: num, message = e.args # pylint: disable=unpacking-non-sequence raise assess_dcerpc_exception(num=num, message=message) @@ -958,18 +1025,18 @@ class TrustDomainInstance(object): clear_value.size = len(password_blob) clear_value.password = password_blob - clear_authentication_information = drsblobs.AuthenticationInformation() - clear_authentication_information.LastUpdateTime = samba.unix2nttime(int(time.time())) - clear_authentication_information.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR - clear_authentication_information.AuthInfo = clear_value + clear_authinfo = drsblobs.AuthenticationInformation() + clear_authinfo.LastUpdateTime = samba.unix2nttime(int(time.time())) + clear_authinfo.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR + clear_authinfo.AuthInfo = clear_value - authentication_information_array = drsblobs.AuthenticationInformationArray() - authentication_information_array.count = 1 - authentication_information_array.array = [clear_authentication_information] + authinfo_array = drsblobs.AuthenticationInformationArray() + authinfo_array.count = 1 + authinfo_array.array = [clear_authinfo] outgoing = drsblobs.trustAuthInOutBlob() outgoing.count = 1 - outgoing.current = authentication_information_array + outgoing.current = authinfo_array confounder = [3]*512 for i in range(512): @@ -983,7 +1050,8 @@ class TrustDomainInstance(object): trustpass_blob = ndr_pack(trustpass) - encrypted_trustpass = arcfour_encrypt(self._pipe.session_key, trustpass_blob) + encrypted_trustpass = arcfour_encrypt(self._pipe.session_key, + trustpass_blob) auth_blob = lsa.DATA_BUF2() auth_blob.size = len(encrypted_trustpass) @@ -993,7 +1061,6 @@ class TrustDomainInstance(object): auth_info.auth_blob = auth_blob self.auth_info = auth_info - def generate_ftinfo(self, another_domain): """ Generates TrustDomainInfoFullInfo2Internal structure @@ -1032,27 +1099,38 @@ class TrustDomainInstance(object): # smbd already has the information about itself ldname = lsa.StringLarge() ldname.string = another_domain.info['dns_domain'] - collision_info = self._pipe.lsaRSetForestTrustInformation(self._policy_handle, - ldname, - lsa.LSA_FOREST_TRUST_DOMAIN_INFO, - ftinfo, 0) - if collision_info: - root_logger.error("When setting forest trust information, got collision info back:\n%s" % (ndr_print(collision_info))) + ftlevel = lsa.LSA_FOREST_TRUST_DOMAIN_INFO + # RSetForestTrustInformation returns collision information + # for trust topology + cinfo = self._pipe.lsaRSetForestTrustInformation( + self._policy_handle, + ldname, + ftlevel, + ftinfo, 0) + if cinfo: + root_logger.error("When setting forest trust information, " + "got collision info back:\n%s" + % (ndr_print(cinfo))) except RuntimeError as e: - # We can ignore the error here -- setting up name suffix routes may fail + # We can ignore the error here -- + # setting up name suffix routes may fail pass - def establish_trust(self, another_domain, trustdom_secret, trust_type='bidirectional', trust_external=False): + def establish_trust(self, another_domain, trustdom_secret, + trust_type='bidirectional', trust_external=False): """ Establishes trust between our and another domain - Input: another_domain -- instance of TrustDomainInstance, initialized with #retrieve call + Input: another_domain -- instance of TrustDomainInstance, + initialized with #retrieve call trustdom_secret -- shared secred used for the trust """ if self.info['name'] == another_domain.info['name']: # Check that NetBIOS names do not clash raise errors.ValidationError(name=u'AD Trust Setup', - error=_('the IPA server and the remote domain cannot share the same ' - 'NetBIOS name: %s') % self.info['name']) + error=_('the IPA server and the ' + 'remote domain cannot share ' + 'the same NetBIOS name: %s') + % self.info['name']) self.generate_auth(trustdom_secret) @@ -1071,8 +1149,12 @@ class TrustDomainInstance(object): try: dname = lsa.String() dname.string = another_domain.info['dns_domain'] - res = self._pipe.QueryTrustedDomainInfoByName(self._policy_handle, dname, lsa.LSA_TRUSTED_DOMAIN_INFO_FULL_INFO) - self._pipe.DeleteTrustedDomain(self._policy_handle, res.info_ex.sid) + res = self._pipe.QueryTrustedDomainInfoByName( + self._policy_handle, + dname, + lsa.LSA_TRUSTED_DOMAIN_INFO_FULL_INFO) + self._pipe.DeleteTrustedDomain(self._policy_handle, + res.info_ex.sid) except RuntimeError as e: num, message = e.args # pylint: disable=unpacking-non-sequence # Ignore anything but access denied (NT_STATUS_ACCESS_DENIED) @@ -1080,7 +1162,10 @@ class TrustDomainInstance(object): raise access_denied_error try: - trustdom_handle = self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE) + trustdom_handle = self._pipe.CreateTrustedDomainEx2( + self._policy_handle, + info, self.auth_info, + security.SEC_STD_DELETE) except RuntimeError as e: num, message = e.args # pylint: disable=unpacking-non-sequence raise assess_dcerpc_exception(num=num, message=message) @@ -1089,13 +1174,19 @@ class TrustDomainInstance(object): # trust settings. Samba insists this has to be done with LSA # OpenTrustedDomain* calls, it is not enough to have a handle # returned by the CreateTrustedDomainEx2 call. - trustdom_handle = self._pipe.OpenTrustedDomainByName(self._policy_handle, dname, security.SEC_FLAG_MAXIMUM_ALLOWED) + trustdom_handle = self._pipe.OpenTrustedDomainByName( + self._policy_handle, + dname, + security.SEC_FLAG_MAXIMUM_ALLOWED) try: - infoclass = lsa.TrustDomainInfoSupportedEncTypes() - infoclass.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5 - infoclass.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 - infoclass.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 - self._pipe.SetInformationTrustedDomain(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, infoclass) + infocls = lsa.TrustDomainInfoSupportedEncTypes() + infocls.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5 + infocls.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 + infocls.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 + self._pipe.SetInformationTrustedDomain( + trustdom_handle, + lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, + infocls) except RuntimeError as e: # We can ignore the error here -- changing enctypes is for # improved security but the trust will work with default values as @@ -1105,13 +1196,16 @@ class TrustDomainInstance(object): if not trust_external: try: - info = self._pipe.QueryTrustedDomainInfo(trustdom_handle, - lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX) + info = self._pipe.QueryTrustedDomainInfo( + trustdom_handle, + lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX) info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE - self._pipe.SetInformationTrustedDomain(trustdom_handle, - lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info) + self._pipe.SetInformationTrustedDomain( + trustdom_handle, + lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info) except RuntimeError as e: - root_logger.error('unable to set trust transitivity status: %s' % (str(e))) + root_logger.error( + 'unable to set trust transitivity status: %s' % (str(e))) if self.info['is_pdc'] or trust_external: self.update_ftinfo(another_domain) @@ -1119,12 +1213,13 @@ class TrustDomainInstance(object): def verify_trust(self, another_domain): def retrieve_netlogon_info_2(logon_server, domain, function_code, data): try: - netr_pipe = netlogon.netlogon(domain.binding, domain.parm, domain.creds) - result = netr_pipe.netr_LogonControl2Ex(logon_server=logon_server, + netr_pipe = netlogon.netlogon(domain.binding, + domain.parm, domain.creds) + result = netr_pipe.netr_LogonControl2Ex( + logon_server=logon_server, function_code=function_code, level=2, - data=data - ) + data=data) return result except RuntimeError as e: num, message = e.args # pylint: disable=unpacking-non-sequence @@ -1135,9 +1230,11 @@ class TrustDomainInstance(object): another_domain.info['dns_domain']) if result and result.flags and netlogon.NETLOGON_VERIFY_STATUS_RETURNED: - if result.pdc_connection_status[0] != 0 and result.tc_connection_status[0] != 0: + if result.pdc_connection_status[0] != 0 and \ + result.tc_connection_status[0] != 0: if result.pdc_connection_status[1] == "WERR_ACCESS_DENIED": - # Most likely AD DC hit another IPA replica which yet has no trust secret replicated + # Most likely AD DC hit another IPA replica which + # yet has no trust secret replicated # Sleep and repeat again self.validation_attempts += 1 @@ -1176,23 +1273,23 @@ class TrustDomainInstance(object): def fetch_domains(api, mydomain, trustdomain, creds=None, server=None): trust_flags = dict( - NETR_TRUST_FLAG_IN_FOREST = 0x00000001, - NETR_TRUST_FLAG_OUTBOUND = 0x00000002, - NETR_TRUST_FLAG_TREEROOT = 0x00000004, - NETR_TRUST_FLAG_PRIMARY = 0x00000008, - NETR_TRUST_FLAG_NATIVE = 0x00000010, - NETR_TRUST_FLAG_INBOUND = 0x00000020, - NETR_TRUST_FLAG_MIT_KRB5 = 0x00000080, - NETR_TRUST_FLAG_AES = 0x00000100) + NETR_TRUST_FLAG_IN_FOREST=0x00000001, + NETR_TRUST_FLAG_OUTBOUND=0x00000002, + NETR_TRUST_FLAG_TREEROOT=0x00000004, + NETR_TRUST_FLAG_PRIMARY=0x00000008, + NETR_TRUST_FLAG_NATIVE=0x00000010, + NETR_TRUST_FLAG_INBOUND=0x00000020, + NETR_TRUST_FLAG_MIT_KRB5=0x00000080, + NETR_TRUST_FLAG_AES=0x00000100) trust_attributes = dict( - NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE = 0x00000001, - NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY = 0x00000002, - NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN = 0x00000004, - NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE = 0x00000008, - NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION = 0x00000010, - NETR_TRUST_ATTRIBUTE_WITHIN_FOREST = 0x00000020, - NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL = 0x00000040) + NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE=0x00000001, + NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY=0x00000002, + NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN=0x00000004, + NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE=0x00000008, + NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION=0x00000010, + NETR_TRUST_ATTRIBUTE_WITHIN_FOREST=0x00000020, + NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL=0x00000040) def communicate(td): td.init_lsa_pipe(td.info['dc']) @@ -1314,7 +1411,8 @@ class TrustDomainJoins(object): ld.retrieve(installutils.get_fqdn()) self.local_domain = ld - def populate_remote_domain(self, realm, realm_server=None, realm_admin=None, realm_passwd=None): + def populate_remote_domain(self, realm, realm_server=None, + realm_admin=None, realm_passwd=None): def get_instance(self): # Fetch data from foreign domain using password only rd = TrustDomainInstance('') @@ -1330,7 +1428,8 @@ class TrustDomainJoins(object): if realm_server is None: rd.retrieve_anonymously(realm, discover_srv=True, search_pdc=True) else: - rd.retrieve_anonymously(realm_server, discover_srv=False, search_pdc=True) + rd.retrieve_anonymously(realm_server, + discover_srv=False, search_pdc=True) rd.read_only = True if realm_admin and realm_passwd: if 'name' in rd.info: @@ -1339,12 +1438,14 @@ class TrustDomainJoins(object): # realm admin is in DOMAIN\user format # strip DOMAIN part as we'll enforce the one discovered realm_admin = names[-1] - auth_string = u"%s\%s%%%s" % (rd.info['name'], realm_admin, realm_passwd) + auth_string = u"%s\%s%%%s" \ + % (rd.info['name'], realm_admin, realm_passwd) td = get_instance(self) td.creds.parse_string(auth_string) td.creds.set_workstation(self.local_domain.hostname) if realm_server is None: - # we must have rd.info['dns_hostname'] then, part of anonymous discovery + # we must have rd.info['dns_hostname'] then + # as it is part of the anonymous discovery td.retrieve(rd.info['dns_hostname']) else: td.retrieve(realm_server) @@ -1358,8 +1459,8 @@ class TrustDomainJoins(object): """ Generate list of records for forest trust information about our realm domains. Note that the list generated currently - includes only top level domains, no exclusion domains, and no TDO objects - as we handle the latter in a separate way + includes only top level domains, no exclusion domains, and + no TDO objects as we handle the latter in a separate way """ if self.local_domain.read_only: return @@ -1367,10 +1468,15 @@ class TrustDomainJoins(object): self.local_domain.ftinfo_records = [] realm_domains = self.api.Command.realmdomains_show()['result'] - # Use realmdomains' modification timestamp to judge records last update time - entry = self.api.Backend.ldap2.get_entry(realm_domains['dn'], ['modifyTimestamp']) + # Use realmdomains' modification timestamp + # to judge records' last update time + entry = self.api.Backend.ldap2.get_entry( + realm_domains['dn'], ['modifyTimestamp']) # Convert the timestamp to Windows 64-bit timestamp format - trust_timestamp = long(time.mktime(entry['modifytimestamp'][0].timetuple())*1e7+116444736000000000) + trust_timestamp = long( + time.mktime( + entry.single_value.get('modifytimestamp').timetuple() + )*1e7+116444736000000000) for dom in realm_domains['associateddomain']: ftinfo = dict() @@ -1379,7 +1485,8 @@ class TrustDomainJoins(object): ftinfo['rec_type'] = lsa.LSA_FOREST_TRUST_TOP_LEVEL_NAME self.local_domain.ftinfo_records.append(ftinfo) - def join_ad_full_credentials(self, realm, realm_server, realm_admin, realm_passwd, trust_type): + def join_ad_full_credentials(self, realm, realm_server, realm_admin, + realm_passwd, trust_type): if not self.configured: return None @@ -1392,24 +1499,33 @@ class TrustDomainJoins(object): ) trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL) - if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']: + if self.remote_domain.info['dns_domain'] != \ + self.remote_domain.info['dns_forest']: if not trust_external: - raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'], - domain=self.remote_domain.info['dns_domain']) + raise errors.NotAForestRootError( + forest=self.remote_domain.info['dns_forest'], + domain=self.remote_domain.info['dns_domain']) if not self.remote_domain.read_only: trustdom_pass = samba.generate_random_password(128, 128) self.get_realmdomains() self.remote_domain.establish_trust(self.local_domain, - trustdom_pass, trust_type, trust_external) + trustdom_pass, + trust_type, trust_external) self.local_domain.establish_trust(self.remote_domain, - trustdom_pass, trust_type, trust_external) - # if trust is inbound, we don't need to verify it because AD DC will respond - # with WERR_NO_SUCH_DOMAIN -- in only does verification for outbound trusts. + trustdom_pass, + trust_type, trust_external) + # if trust is inbound, we don't need to verify it because + # AD DC will respond with WERR_NO_SUCH_DOMAIN -- + # it only does verification for outbound trusts. result = True if trust_type == TRUST_BIDIRECTIONAL: result = self.remote_domain.verify_trust(self.local_domain) - return dict(local=self.local_domain, remote=self.remote_domain, verified=result) + return dict( + local=self.local_domain, + remote=self.remote_domain, + verified=result + ) return None def join_ad_ipa_half(self, realm, realm_server, trustdom_passwd, trust_type): @@ -1420,11 +1536,18 @@ class TrustDomainJoins(object): self.populate_remote_domain(realm, realm_server, realm_passwd=None) trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL) - if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']: + if self.remote_domain.info['dns_domain'] != \ + self.remote_domain.info['dns_forest']: if not trust_external: - raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'], - domain=self.remote_domain.info['dns_domain']) + raise errors.NotAForestRootError( + forest=self.remote_domain.info['dns_forest'], + domain=self.remote_domain.info['dns_domain']) self.local_domain.establish_trust(self.remote_domain, - trustdom_passwd, trust_type, trust_external) - return dict(local=self.local_domain, remote=self.remote_domain, verified=False) + trustdom_passwd, + trust_type, trust_external) + return dict( + local=self.local_domain, + remote=self.remote_domain, + verified=False + )