ipaserver/dcerpc: reformat to make the code closer to pep8

Because Samba Python bindings provide long-named methods and constants,
sometimes it is impossible to fit into 80 columns without causing
damage to readability of the code. This patchset attempts to reduce
pep8 complaints to a minimum.

https://fedorahosted.org/freeipa/ticket/6076

Reviewed-By: Martin Babinsky <mbabinsk@redhat.com>
This commit is contained in:
Alexander Bokovoy 2016-06-07 22:41:10 +03:00 committed by Martin Babinsky
parent 3cf80e747d
commit c547d5567d

View File

@ -44,10 +44,12 @@ import samba
import random
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.backends import default_backend
# pylint: disable=F0401
try:
from ldap.controls import RequestControl as LDAPControl #pylint: disable=F0401
from ldap.controls import RequestControl as LDAPControl
except ImportError:
from ldap.controls import LDAPControl as LDAPControl #pylint: disable=F0401
from ldap.controls import LDAPControl as LDAPControl
# pylint: enable=F0401
import ldap as _ldap
from ipapython.ipaldap import IPAdmin
from ipaserver.session import krbccache_dir, krbccache_prefix
@ -74,7 +76,7 @@ and Samba4 python bindings.
# Both constants can be used as masks against trust direction
# because bi-directional has two lower bits set.
TRUST_ONEWAY = 1
TRUST_ONEWAY = 1
TRUST_BIDIRECTIONAL = 3
# Trust join behavior
@ -91,31 +93,44 @@ def is_sid_valid(sid):
return True
access_denied_error = errors.ACIError(info=_('CIFS server denied your credentials'))
access_denied_error = errors.ACIError(
info=_('CIFS server denied your credentials'))
dcerpc_error_codes = {
-1073741823:
errors.RemoteRetrieveError(reason=_('communication with CIFS server was unsuccessful')),
errors.RemoteRetrieveError(
reason=_('communication with CIFS server was unsuccessful')),
-1073741790: access_denied_error,
-1073741715: access_denied_error,
-1073741614: access_denied_error,
-1073741603:
errors.ValidationError(name=_('AD domain controller'), error=_('unsupported functional level')),
-1073741811: # NT_STATUS_INVALID_PARAMETER
errors.ValidationError(
name=_('AD domain controller'),
error=_('unsupported functional level')),
-1073741811: # NT_STATUS_INVALID_PARAMETER
errors.RemoteRetrieveError(
reason=_('AD domain controller complains about communication sequence. It may mean unsynchronized time on both sides, for example')),
-1073741776: # NT_STATUS_INVALID_PARAMETER_MIX, we simply will skip the binding
reason=_('AD domain controller complains about communication '
'sequence. It may mean unsynchronized time on both '
'sides, for example')),
-1073741776: # NT_STATUS_INVALID_PARAMETER_MIX,
# we simply will skip the binding
access_denied_error,
-1073741772: # NT_STATUS_OBJECT_NAME_NOT_FOUND
errors.RemoteRetrieveError(reason=_('CIFS server configuration does not allow access to \\\\pipe\\lsarpc')),
-1073741772: # NT_STATUS_OBJECT_NAME_NOT_FOUND
errors.RemoteRetrieveError(
reason=_('CIFS server configuration does not allow '
'access to \\\\pipe\\lsarpc')),
}
dcerpc_error_messages = {
"NT_STATUS_OBJECT_NAME_NOT_FOUND":
errors.NotFound(reason=_('Cannot find specified domain or server name')),
errors.NotFound(
reason=_('Cannot find specified domain or server name')),
"WERR_NO_LOGON_SERVERS":
errors.RemoteRetrieveError(reason=_('AD DC was unable to reach any IPA domain controller. Most likely it is a DNS or firewall issue')),
errors.RemoteRetrieveError(
reason=_('AD DC was unable to reach any IPA domain controller. '
'Most likely it is a DNS or firewall issue')),
"NT_STATUS_INVALID_PARAMETER_MIX":
errors.RequirementError(name=_('At least the domain or IP address should be specified')),
errors.RequirementError(
name=_('At least the domain or IP address should be specified')),
}
pysss_type_key_translation_dict = {
@ -126,7 +141,7 @@ pysss_type_key_translation_dict = {
}
def assess_dcerpc_exception(num=None,message=None):
def assess_dcerpc_exception(num=None, message=None):
"""
Takes error returned by Samba bindings and converts it into
an IPA error class.
@ -135,8 +150,9 @@ def assess_dcerpc_exception(num=None,message=None):
return dcerpc_error_codes[num]
if message and message in dcerpc_error_messages:
return dcerpc_error_messages[message]
reason = _('''CIFS server communication error: code "%(num)s",
message "%(message)s" (both may be "None")''') % dict(num=num, message=message)
reason = _('CIFS server communication error: code "%(num)s", '
'message "%(message)s" (both may be "None")') % \
dict(num=num, message=message)
return errors.RemoteRetrieveError(reason=reason)
@ -182,9 +198,13 @@ class DomainValidator(object):
self._parm = None
def is_configured(self):
cn_trust_local = DN(('cn', self.api.env.domain), self.api.env.container_cifsdomains, self.api.env.basedn)
cn_trust_local = DN(('cn', self.api.env.domain),
self.api.env.container_cifsdomains,
self.api.env.basedn)
try:
entry_attrs = self.ldap.get_entry(cn_trust_local, [self.ATTR_FLATNAME, self.ATTR_SID])
entry_attrs = self.ldap.get_entry(cn_trust_local,
[self.ATTR_FLATNAME,
self.ATTR_SID])
self.flatname = entry_attrs[self.ATTR_FLATNAME][0]
self.sid = entry_attrs[self.ATTR_SID][0]
self.dn = entry_attrs.dn
@ -203,7 +223,8 @@ class DomainValidator(object):
try:
search_kw = {'objectClass': 'ipaNTTrustedDomain'}
filter = self.ldap.make_filter(search_kw, rules=self.ldap.MATCH_ALL)
filter = self.ldap.make_filter(search_kw,
rules=self.ldap.MATCH_ALL)
(entries, truncated) = self.ldap.find_entries(
filter=filter,
base_dn=cn_trust,
@ -216,22 +237,22 @@ class DomainValidator(object):
# domain names as keys and those are generally case-insensitive
result = ipautil.CIDict()
for entry in entries:
for e in entries:
try:
trust_partner = entry[self.ATTR_TRUST_PARTNER][0]
flatname_normalized = entry[self.ATTR_FLATNAME][0].lower()
trusted_sid = entry[self.ATTR_TRUSTED_SID][0]
except KeyError as e:
t_partner = e.single_value.get(self.ATTR_TRUST_PARTNER)
fname_norm = e.single_value.get(self.ATTR_FLATNAME).lower()
trusted_sid = e.single_value.get(self.ATTR_TRUSTED_SID)
except KeyError as exc:
# Some piece of trusted domain info in LDAP is missing
# Skip the domain, but leave log entry for investigation
api.log.warning("Trusted domain '%s' entry misses an "
"attribute: %s", entry.dn, e)
"attribute: %s", e.dn, exc)
continue
result[trust_partner] = (flatname_normalized,
security.dom_sid(trusted_sid))
result[t_partner] = (fname_norm,
security.dom_sid(trusted_sid))
return result
except errors.NotFound as e:
except errors.NotFound as exc:
return []
def set_trusted_domains(self):
@ -244,21 +265,22 @@ class DomainValidator(object):
# This means we can't check the correctness of a trusted
# domain SIDs
raise errors.ValidationError(name='sid',
error=_('no trusted domain is configured'))
error=_('no trusted domain '
'is configured'))
def get_domain_by_sid(self, sid, exact_match=False):
if not self.domain:
# our domain is not configured or self.is_configured() never run
# reject SIDs as we can't check correctness of them
raise errors.ValidationError(name='sid',
error=_('domain is not configured'))
error=_('domain is not configured'))
# Parse sid string to see if it is really in a SID format
try:
test_sid = security.dom_sid(sid)
except TypeError:
raise errors.ValidationError(name='sid',
error=_('SID is not valid'))
error=_('SID is not valid'))
# At this point we have SID_NT_AUTHORITY family SID and really need to
# check it against prefixes of domain SIDs we trust to
@ -314,30 +336,34 @@ class DomainValidator(object):
return None
def get_trusted_domain_objects(self, domain=None, flatname=None, filter="",
attrs=None, scope=_ldap.SCOPE_SUBTREE, basedn=None):
attrs=None, scope=_ldap.SCOPE_SUBTREE,
basedn=None):
"""
Search for LDAP objects in a trusted domain specified either by `domain'
or `flatname'. The actual LDAP search is specified by `filter', `attrs',
`scope' and `basedn'. When `basedn' is empty, database root DN is used.
Search for LDAP objects in a trusted domain specified either by
`domain' or `flatname'. The actual LDAP search is specified by
`filter', `attrs', `scope' and `basedn'. When `basedn' is empty,
database root DN is used.
"""
assert domain is not None or flatname is not None
"""Returns SID for the trusted domain object (user or group only)"""
if not self.domain:
# our domain is not configured or self.is_configured() never run
raise errors.ValidationError(name=_('Trust setup'),
error=_('Our domain is not configured'))
error=_('Our domain is '
'not configured'))
if not self._domains:
self._domains = self.get_trusted_domains()
if len(self._domains) == 0:
# Our domain is configured but no trusted domains are configured
raise errors.ValidationError(name=_('Trust setup'),
error=_('No trusted domain is not configured'))
error=_('No trusted domain is '
'not configured'))
entries = None
if domain is not None:
if domain not in self._domains:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('domain is not trusted'))
error=_('domain is not trusted'))
# Now we have a name to check against our list of trusted domains
entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
elif flatname is not None:
@ -347,53 +373,65 @@ class DomainValidator(object):
for domain in self._domains:
if self._domains[domain][0] == flatname:
found_flatname = True
entries = self.search_in_dc(domain, filter, attrs, scope, basedn)
entries = self.search_in_dc(domain, filter,
attrs, scope, basedn)
if entries:
break
if not found_flatname:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('no trusted domain matched the specified flat name'))
error=_('no trusted domain '
'matched the specified '
'flat name'))
if not entries:
raise errors.NotFound(reason=_('trusted domain object not found'))
return entries
def get_trusted_domain_object_sid(self, object_name, fallback_to_ldap=True):
def get_trusted_domain_object_sid(self, object_name,
fallback_to_ldap=True):
result = pysss_nss_idmap.getsidbyname(object_name)
if object_name in result and (pysss_nss_idmap.SID_KEY in result[object_name]):
if object_name in result and \
(pysss_nss_idmap.SID_KEY in result[object_name]):
object_sid = result[object_name][pysss_nss_idmap.SID_KEY]
return object_sid
# If fallback to AD DC LDAP is not allowed, bail out
if not fallback_to_ldap:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('SSSD was unable to resolve the object to a valid SID'))
error=_('SSSD was unable to resolve '
'the object to a valid SID'))
# Else, we are going to contact AD DC LDAP
components = normalize_name(object_name)
if not ('domain' in components or 'flatname' in components):
# No domain or realm specified, ambiguous search
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Ambiguous search, user domain was not specified'))
raise errors.ValidationError(name=_('trusted domain object'),
error=_('Ambiguous search, user '
'domain was not specified'))
attrs = ['objectSid']
filter = '(&(sAMAccountName=%(name)s)(|(objectClass=user)(objectClass=group)))' \
% dict(name=components['name'])
filter = '(&(sAMAccountName=%(name)s)' \
'(|(objectClass=user)(objectClass=group)))' \
% dict(name=components['name'])
scope = _ldap.SCOPE_SUBTREE
entries = self.get_trusted_domain_objects(components.get('domain'),
components.get('flatname'), filter, attrs, scope)
components.get('flatname'),
filter, attrs, scope)
if len(entries) > 1:
# Treat non-unique entries as invalid
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Trusted domain did not return a unique object'))
error=_('Trusted domain did not '
'return a unique object'))
sid = self.__sid_to_str(entries[0]['objectSid'][0])
try:
test_sid = security.dom_sid(sid)
return unicode(test_sid)
except TypeError as e:
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Trusted domain did not return a valid SID for the object'))
error=_('Trusted domain did not '
'return a valid SID for '
'the object'))
def get_trusted_domain_object_type(self, name_or_sid):
"""
@ -443,7 +481,8 @@ class DomainValidator(object):
)
attrs = ['sAMAccountName']
filter = (r'(&(objectSid=%(sid)s)(|(objectClass=user)(objectClass=group)))'
filter = (r'(&(objectSid=%(sid)s)'
'(|(objectClass=user)(objectClass=group)))'
% dict(sid=escaped_sid)) # sid in binary
domain = self.get_domain_by_sid(sid)
@ -454,7 +493,8 @@ class DomainValidator(object):
if len(entries) > 1:
# Treat non-unique entries as invalid
raise errors.ValidationError(name=_('trusted domain object'),
error=_('Trusted domain did not return a unique object'))
error=_('Trusted domain did not '
'return a unique object'))
object_name = (
"%s@%s" % (entries[0].single_value['sAMAccountName'].lower(),
@ -486,27 +526,31 @@ class DomainValidator(object):
# Now search a trusted domain for a user with this SID
attrs = ['cn']
filter = '(&(objectClass=user)(objectSid=%(sid)s))' \
% dict(sid=object_name)
% dict(sid=object_name)
try:
entries = self.get_trusted_domain_objects(domain=domain, filter=filter,
attrs=attrs, scope=_ldap.SCOPE_SUBTREE)
entries = self.get_trusted_domain_objects(domain=domain,
filter=filter,
attrs=attrs,
scope=_ldap.SCOPE_SUBTREE)
except errors.NotFound:
raise errors.NotFound(reason=_('trusted domain user not found'))
user_dn = entries[0].dn
elif domain or flatname:
attrs = ['cn']
filter = '(&(sAMAccountName=%(name)s)(objectClass=user))' \
% dict(name=name)
% dict(name=name)
try:
entries = self.get_trusted_domain_objects(domain,
flatname, filter, attrs, _ldap.SCOPE_SUBTREE)
flatname, filter, attrs,
_ldap.SCOPE_SUBTREE)
except errors.NotFound:
raise errors.NotFound(reason=_('trusted domain user not found'))
user_dn = entries[0].dn
else:
# No domain or realm specified, ambiguous search
raise errors.ValidationError(name=_('trusted domain object'),
error= _('Ambiguous search, user domain was not specified'))
error=_('Ambiguous search, '
'user domain was not specified'))
# Get SIDs of user object and it's groups
# tokenGroups attribute must be read with a scope BASE for a known user
@ -514,9 +558,11 @@ class DomainValidator(object):
attrs = ['objectSID', 'tokenGroups']
filter = "(objectClass=user)"
entries = self.get_trusted_domain_objects(domain,
flatname, filter, attrs, _ldap.SCOPE_BASE, user_dn)
flatname, filter, attrs,
_ldap.SCOPE_BASE, user_dn)
object_sid = self.__sid_to_str(entries[0]['objectSid'][0])
group_sids = [self.__sid_to_str(sid) for sid in entries[0]['tokenGroups']]
group_sids = [self.__sid_to_str(sid)
for sid in entries[0]['tokenGroups']]
return (object_sid, group_sids)
def get_trusted_domain_user_and_groups(self, object_name):
@ -540,11 +586,14 @@ class DomainValidator(object):
if is_valid_sid:
object_sid = object_name
result = pysss_nss_idmap.getnamebysid(object_name)
if object_name in result and (pysss_nss_idmap.NAME_KEY in result[object_name]):
group_list = pysss.getgrouplist(result[object_name][pysss_nss_idmap.NAME_KEY])
if object_name in result and \
(pysss_nss_idmap.NAME_KEY in result[object_name]):
group_list = pysss.getgrouplist(
result[object_name][pysss_nss_idmap.NAME_KEY])
else:
result = pysss_nss_idmap.getsidbyname(object_name)
if object_name in result and (pysss_nss_idmap.SID_KEY in result[object_name]):
if object_name in result and \
(pysss_nss_idmap.SID_KEY in result[object_name]):
object_sid = result[object_name][pysss_nss_idmap.SID_KEY]
group_list = pysss.getgrouplist(object_name)
@ -552,7 +601,10 @@ class DomainValidator(object):
return self.__get_trusted_domain_user_and_groups(object_name)
group_sids = pysss_nss_idmap.getsidbyname(group_list)
return (object_sid, [el[1][pysss_nss_idmap.SID_KEY] for el in group_sids.items()])
return (
object_sid,
[el[1][pysss_nss_idmap.SID_KEY] for el in group_sids.items()]
)
def __sid_to_str(self, sid):
"""
@ -561,12 +613,13 @@ class DomainValidator(object):
"""
sid_rev_num = ord(sid[0])
number_sub_id = ord(sid[1])
ia = struct.unpack('!Q','\x00\x00'+sid[2:8])[0]
ia = struct.unpack('!Q', '\x00\x00'+sid[2:8])[0]
subs = [
struct.unpack('<I',sid[8+4*i:12+4*i])[0]
struct.unpack('<I', sid[8+4*i:12+4*i])[0]
for i in range(number_sub_id)
]
return u'S-%d-%d-%s' % ( sid_rev_num, ia, '-'.join([str(s) for s in subs]),)
return u'S-%d-%d-%s' % (sid_rev_num, ia,
'-'.join([str(s) for s in subs]),)
def kinit_as_http(self, domain):
"""
@ -624,7 +677,7 @@ class DomainValidator(object):
error on ccache initialization
"""
if self._admin_creds == None:
if self._admin_creds is None:
return (None, None)
domain_suffix = domain.replace('.', '-')
@ -691,7 +744,8 @@ class DomainValidator(object):
ccache_name = None
if self._admin_creds:
(ccache_name, principal) = self.kinit_as_administrator(info['dns_domain'])
(ccache_name,
principal) = self.kinit_as_administrator(info['dns_domain'])
if ccache_name:
with ipautil.private_ccache(path=ccache_name):
@ -736,7 +790,7 @@ class DomainValidator(object):
if not self._creds:
self._parm = param.LoadParm()
self._parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
self._parm.load(os.path.join(ipautil.SHARE_DIR, "smb.conf.empty"))
self._parm.set('netbios name', self.flatname)
self._creds = credentials.Credentials()
self._creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
@ -746,12 +800,14 @@ class DomainValidator(object):
netrc = net.Net(creds=self._creds, lp=self._parm)
finddc_error = None
result = None
flags = nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC | nbt.NBT_SERVER_CLOSEST
try:
result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC | nbt.NBT_SERVER_CLOSEST)
result = netrc.finddc(domain=domain, flags=flags)
except RuntimeError as e:
try:
# If search of closest GC failed, attempt to find any one
result = netrc.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC)
flags = nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_GC
result = netrc.finddc(domain=domain, flags=flags)
except RuntimeError as e:
finddc_error = e
@ -789,6 +845,7 @@ class DomainValidator(object):
self._info[domain] = info
return info
def string_to_array(what):
return [ord(v) for v in what]
@ -797,7 +854,7 @@ class TrustDomainInstance(object):
def __init__(self, hostname, creds=None):
self.parm = param.LoadParm()
self.parm.load(os.path.join(ipautil.SHARE_DIR,"smb.conf.empty"))
self.parm.load(os.path.join(ipautil.SHARE_DIR, "smb.conf.empty"))
if len(hostname) > 0:
self.parm.set('netbios name', hostname)
self.creds = creds
@ -810,14 +867,14 @@ class TrustDomainInstance(object):
self.validation_attempts = 0
def __gen_lsa_connection(self, binding):
if self.creds is None:
raise errors.RequirementError(name=_('CIFS credentials object'))
try:
result = lsa.lsarpc(binding, self.parm, self.creds)
return result
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
if self.creds is None:
raise errors.RequirementError(name=_('CIFS credentials object'))
try:
result = lsa.lsarpc(binding, self.parm, self.creds)
return result
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
def init_lsa_pipe(self, remote_host):
"""
@ -847,30 +904,35 @@ class TrustDomainInstance(object):
# When session key is not available, we just skip this binding
session_attempts = session_attempts + 1
if self._pipe is None and (attempts + session_attempts) == len(bindings):
if self._pipe is None and \
(attempts + session_attempts) == len(bindings):
raise errors.ACIError(
info=_('CIFS server %(host)s denied your credentials') % dict(host=remote_host))
info=_('CIFS server %(host)s denied your credentials')
% dict(host=remote_host))
if self._pipe is None:
raise errors.RemoteRetrieveError(
reason=_('Cannot establish LSA connection to %(host)s. Is CIFS server running?') % dict(host=remote_host))
reason=_('Cannot establish LSA connection to %(host)s. '
'Is CIFS server running?') % dict(host=remote_host))
self.binding = binding
self.session_key = self._pipe.session_key
def __gen_lsa_bindings(self, remote_host):
"""
There are multiple transports to issue LSA calls. However, depending on a
system in use they may be blocked by local operating system policies.
There are multiple transports to issue LSA calls. However, depending on
a system in use they may be blocked by local operating system policies.
Generate all we can use. init_lsa_pipe() will try them one by one until
there is one working.
We try NCACN_NP before NCACN_IP_TCP and use SMB2 before SMB1 or defaults.
We try NCACN_NP before NCACN_IP_TCP and use SMB2 before SMB1.
"""
transports = (u'ncacn_np', u'ncacn_ip_tcp')
options = ( u'smb2,print', u'print')
return [u'%s:%s[%s]' % (t, remote_host, o) for t in transports for o in options]
options = (u'smb2,print', u'print')
return [u'%s:%s[%s]' % (t, remote_host, o)
for t in transports for o in options]
def retrieve_anonymously(self, remote_host, discover_srv=False, search_pdc=False):
def retrieve_anonymously(self, remote_host,
discover_srv=False, search_pdc=False):
"""
When retrieving DC information anonymously, we can't get SID of the domain
"""
@ -896,7 +958,8 @@ class TrustDomainInstance(object):
self.info['is_pdc'] = (result.server_type & nbt.NBT_SERVER_PDC) != 0
# Netlogon response doesn't contain SID of the domain.
# We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID control to reveal the SID
# We need to do rootDSE search with LDAP_SERVER_EXTENDED_DN_OID
# control to reveal the SID
ldap_uri = 'ldap://%s' % (result.pdc_dns_name)
conn = _ldap.initialize(ldap_uri)
conn.set_option(_ldap.OPT_SERVER_CONTROLS, [ExtendedDNControl()])
@ -908,7 +971,7 @@ class TrustDomainInstance(object):
except _ldap.LDAPError as e:
root_logger.error(
"LDAP error when connecting to %(host)s: %(error)s" %
dict(host=unicode(result.pdc_name), error=str(e)))
dict(host=unicode(result.pdc_name), error=str(e)))
except KeyError as e:
root_logger.error("KeyError: {err}, LDAP entry from {host} "
"returned malformed. Your DNS might be "
@ -930,8 +993,11 @@ class TrustDomainInstance(object):
objectAttribute = lsa.ObjectAttribute()
objectAttribute.sec_qos = lsa.QosInfo()
try:
self._policy_handle = self._pipe.OpenPolicy2(u"", objectAttribute, security.SEC_FLAG_MAXIMUM_ALLOWED)
result = self._pipe.QueryInfoPolicy2(self._policy_handle, lsa.LSA_POLICY_INFO_DNS)
self._policy_handle = \
self._pipe.OpenPolicy2(u"", objectAttribute,
security.SEC_FLAG_MAXIMUM_ALLOWED)
result = self._pipe.QueryInfoPolicy2(self._policy_handle,
lsa.LSA_POLICY_INFO_DNS)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
@ -944,7 +1010,8 @@ class TrustDomainInstance(object):
self.info['dc'] = remote_host
try:
result = self._pipe.QueryInfoPolicy2(self._policy_handle, lsa.LSA_POLICY_INFO_ROLE)
result = self._pipe.QueryInfoPolicy2(self._policy_handle,
lsa.LSA_POLICY_INFO_ROLE)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
@ -958,18 +1025,18 @@ class TrustDomainInstance(object):
clear_value.size = len(password_blob)
clear_value.password = password_blob
clear_authentication_information = drsblobs.AuthenticationInformation()
clear_authentication_information.LastUpdateTime = samba.unix2nttime(int(time.time()))
clear_authentication_information.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR
clear_authentication_information.AuthInfo = clear_value
clear_authinfo = drsblobs.AuthenticationInformation()
clear_authinfo.LastUpdateTime = samba.unix2nttime(int(time.time()))
clear_authinfo.AuthType = lsa.TRUST_AUTH_TYPE_CLEAR
clear_authinfo.AuthInfo = clear_value
authentication_information_array = drsblobs.AuthenticationInformationArray()
authentication_information_array.count = 1
authentication_information_array.array = [clear_authentication_information]
authinfo_array = drsblobs.AuthenticationInformationArray()
authinfo_array.count = 1
authinfo_array.array = [clear_authinfo]
outgoing = drsblobs.trustAuthInOutBlob()
outgoing.count = 1
outgoing.current = authentication_information_array
outgoing.current = authinfo_array
confounder = [3]*512
for i in range(512):
@ -983,7 +1050,8 @@ class TrustDomainInstance(object):
trustpass_blob = ndr_pack(trustpass)
encrypted_trustpass = arcfour_encrypt(self._pipe.session_key, trustpass_blob)
encrypted_trustpass = arcfour_encrypt(self._pipe.session_key,
trustpass_blob)
auth_blob = lsa.DATA_BUF2()
auth_blob.size = len(encrypted_trustpass)
@ -993,7 +1061,6 @@ class TrustDomainInstance(object):
auth_info.auth_blob = auth_blob
self.auth_info = auth_info
def generate_ftinfo(self, another_domain):
"""
Generates TrustDomainInfoFullInfo2Internal structure
@ -1032,27 +1099,38 @@ class TrustDomainInstance(object):
# smbd already has the information about itself
ldname = lsa.StringLarge()
ldname.string = another_domain.info['dns_domain']
collision_info = self._pipe.lsaRSetForestTrustInformation(self._policy_handle,
ldname,
lsa.LSA_FOREST_TRUST_DOMAIN_INFO,
ftinfo, 0)
if collision_info:
root_logger.error("When setting forest trust information, got collision info back:\n%s" % (ndr_print(collision_info)))
ftlevel = lsa.LSA_FOREST_TRUST_DOMAIN_INFO
# RSetForestTrustInformation returns collision information
# for trust topology
cinfo = self._pipe.lsaRSetForestTrustInformation(
self._policy_handle,
ldname,
ftlevel,
ftinfo, 0)
if cinfo:
root_logger.error("When setting forest trust information, "
"got collision info back:\n%s"
% (ndr_print(cinfo)))
except RuntimeError as e:
# We can ignore the error here -- setting up name suffix routes may fail
# We can ignore the error here --
# setting up name suffix routes may fail
pass
def establish_trust(self, another_domain, trustdom_secret, trust_type='bidirectional', trust_external=False):
def establish_trust(self, another_domain, trustdom_secret,
trust_type='bidirectional', trust_external=False):
"""
Establishes trust between our and another domain
Input: another_domain -- instance of TrustDomainInstance, initialized with #retrieve call
Input: another_domain -- instance of TrustDomainInstance,
initialized with #retrieve call
trustdom_secret -- shared secred used for the trust
"""
if self.info['name'] == another_domain.info['name']:
# Check that NetBIOS names do not clash
raise errors.ValidationError(name=u'AD Trust Setup',
error=_('the IPA server and the remote domain cannot share the same '
'NetBIOS name: %s') % self.info['name'])
error=_('the IPA server and the '
'remote domain cannot share '
'the same NetBIOS name: %s')
% self.info['name'])
self.generate_auth(trustdom_secret)
@ -1071,8 +1149,12 @@ class TrustDomainInstance(object):
try:
dname = lsa.String()
dname.string = another_domain.info['dns_domain']
res = self._pipe.QueryTrustedDomainInfoByName(self._policy_handle, dname, lsa.LSA_TRUSTED_DOMAIN_INFO_FULL_INFO)
self._pipe.DeleteTrustedDomain(self._policy_handle, res.info_ex.sid)
res = self._pipe.QueryTrustedDomainInfoByName(
self._policy_handle,
dname,
lsa.LSA_TRUSTED_DOMAIN_INFO_FULL_INFO)
self._pipe.DeleteTrustedDomain(self._policy_handle,
res.info_ex.sid)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
# Ignore anything but access denied (NT_STATUS_ACCESS_DENIED)
@ -1080,7 +1162,10 @@ class TrustDomainInstance(object):
raise access_denied_error
try:
trustdom_handle = self._pipe.CreateTrustedDomainEx2(self._policy_handle, info, self.auth_info, security.SEC_STD_DELETE)
trustdom_handle = self._pipe.CreateTrustedDomainEx2(
self._policy_handle,
info, self.auth_info,
security.SEC_STD_DELETE)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
@ -1089,13 +1174,19 @@ class TrustDomainInstance(object):
# trust settings. Samba insists this has to be done with LSA
# OpenTrustedDomain* calls, it is not enough to have a handle
# returned by the CreateTrustedDomainEx2 call.
trustdom_handle = self._pipe.OpenTrustedDomainByName(self._policy_handle, dname, security.SEC_FLAG_MAXIMUM_ALLOWED)
trustdom_handle = self._pipe.OpenTrustedDomainByName(
self._policy_handle,
dname,
security.SEC_FLAG_MAXIMUM_ALLOWED)
try:
infoclass = lsa.TrustDomainInfoSupportedEncTypes()
infoclass.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5
infoclass.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
infoclass.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
self._pipe.SetInformationTrustedDomain(trustdom_handle, lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES, infoclass)
infocls = lsa.TrustDomainInfoSupportedEncTypes()
infocls.enc_types = security.KERB_ENCTYPE_RC4_HMAC_MD5
infocls.enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
infocls.enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
self._pipe.SetInformationTrustedDomain(
trustdom_handle,
lsa.LSA_TRUSTED_DOMAIN_SUPPORTED_ENCRYPTION_TYPES,
infocls)
except RuntimeError as e:
# We can ignore the error here -- changing enctypes is for
# improved security but the trust will work with default values as
@ -1105,13 +1196,16 @@ class TrustDomainInstance(object):
if not trust_external:
try:
info = self._pipe.QueryTrustedDomainInfo(trustdom_handle,
lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX)
info = self._pipe.QueryTrustedDomainInfo(
trustdom_handle,
lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX)
info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE
self._pipe.SetInformationTrustedDomain(trustdom_handle,
lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info)
self._pipe.SetInformationTrustedDomain(
trustdom_handle,
lsa.LSA_TRUSTED_DOMAIN_INFO_INFO_EX, info)
except RuntimeError as e:
root_logger.error('unable to set trust transitivity status: %s' % (str(e)))
root_logger.error(
'unable to set trust transitivity status: %s' % (str(e)))
if self.info['is_pdc'] or trust_external:
self.update_ftinfo(another_domain)
@ -1119,12 +1213,13 @@ class TrustDomainInstance(object):
def verify_trust(self, another_domain):
def retrieve_netlogon_info_2(logon_server, domain, function_code, data):
try:
netr_pipe = netlogon.netlogon(domain.binding, domain.parm, domain.creds)
result = netr_pipe.netr_LogonControl2Ex(logon_server=logon_server,
netr_pipe = netlogon.netlogon(domain.binding,
domain.parm, domain.creds)
result = netr_pipe.netr_LogonControl2Ex(
logon_server=logon_server,
function_code=function_code,
level=2,
data=data
)
data=data)
return result
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
@ -1135,9 +1230,11 @@ class TrustDomainInstance(object):
another_domain.info['dns_domain'])
if result and result.flags and netlogon.NETLOGON_VERIFY_STATUS_RETURNED:
if result.pdc_connection_status[0] != 0 and result.tc_connection_status[0] != 0:
if result.pdc_connection_status[0] != 0 and \
result.tc_connection_status[0] != 0:
if result.pdc_connection_status[1] == "WERR_ACCESS_DENIED":
# Most likely AD DC hit another IPA replica which yet has no trust secret replicated
# Most likely AD DC hit another IPA replica which
# yet has no trust secret replicated
# Sleep and repeat again
self.validation_attempts += 1
@ -1176,23 +1273,23 @@ class TrustDomainInstance(object):
def fetch_domains(api, mydomain, trustdomain, creds=None, server=None):
trust_flags = dict(
NETR_TRUST_FLAG_IN_FOREST = 0x00000001,
NETR_TRUST_FLAG_OUTBOUND = 0x00000002,
NETR_TRUST_FLAG_TREEROOT = 0x00000004,
NETR_TRUST_FLAG_PRIMARY = 0x00000008,
NETR_TRUST_FLAG_NATIVE = 0x00000010,
NETR_TRUST_FLAG_INBOUND = 0x00000020,
NETR_TRUST_FLAG_MIT_KRB5 = 0x00000080,
NETR_TRUST_FLAG_AES = 0x00000100)
NETR_TRUST_FLAG_IN_FOREST=0x00000001,
NETR_TRUST_FLAG_OUTBOUND=0x00000002,
NETR_TRUST_FLAG_TREEROOT=0x00000004,
NETR_TRUST_FLAG_PRIMARY=0x00000008,
NETR_TRUST_FLAG_NATIVE=0x00000010,
NETR_TRUST_FLAG_INBOUND=0x00000020,
NETR_TRUST_FLAG_MIT_KRB5=0x00000080,
NETR_TRUST_FLAG_AES=0x00000100)
trust_attributes = dict(
NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE = 0x00000001,
NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY = 0x00000002,
NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN = 0x00000004,
NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE = 0x00000008,
NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION = 0x00000010,
NETR_TRUST_ATTRIBUTE_WITHIN_FOREST = 0x00000020,
NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL = 0x00000040)
NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE=0x00000001,
NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY=0x00000002,
NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN=0x00000004,
NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE=0x00000008,
NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION=0x00000010,
NETR_TRUST_ATTRIBUTE_WITHIN_FOREST=0x00000020,
NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL=0x00000040)
def communicate(td):
td.init_lsa_pipe(td.info['dc'])
@ -1314,7 +1411,8 @@ class TrustDomainJoins(object):
ld.retrieve(installutils.get_fqdn())
self.local_domain = ld
def populate_remote_domain(self, realm, realm_server=None, realm_admin=None, realm_passwd=None):
def populate_remote_domain(self, realm, realm_server=None,
realm_admin=None, realm_passwd=None):
def get_instance(self):
# Fetch data from foreign domain using password only
rd = TrustDomainInstance('')
@ -1330,7 +1428,8 @@ class TrustDomainJoins(object):
if realm_server is None:
rd.retrieve_anonymously(realm, discover_srv=True, search_pdc=True)
else:
rd.retrieve_anonymously(realm_server, discover_srv=False, search_pdc=True)
rd.retrieve_anonymously(realm_server,
discover_srv=False, search_pdc=True)
rd.read_only = True
if realm_admin and realm_passwd:
if 'name' in rd.info:
@ -1339,12 +1438,14 @@ class TrustDomainJoins(object):
# realm admin is in DOMAIN\user format
# strip DOMAIN part as we'll enforce the one discovered
realm_admin = names[-1]
auth_string = u"%s\%s%%%s" % (rd.info['name'], realm_admin, realm_passwd)
auth_string = u"%s\%s%%%s" \
% (rd.info['name'], realm_admin, realm_passwd)
td = get_instance(self)
td.creds.parse_string(auth_string)
td.creds.set_workstation(self.local_domain.hostname)
if realm_server is None:
# we must have rd.info['dns_hostname'] then, part of anonymous discovery
# we must have rd.info['dns_hostname'] then
# as it is part of the anonymous discovery
td.retrieve(rd.info['dns_hostname'])
else:
td.retrieve(realm_server)
@ -1358,8 +1459,8 @@ class TrustDomainJoins(object):
"""
Generate list of records for forest trust information about
our realm domains. Note that the list generated currently
includes only top level domains, no exclusion domains, and no TDO objects
as we handle the latter in a separate way
includes only top level domains, no exclusion domains, and
no TDO objects as we handle the latter in a separate way
"""
if self.local_domain.read_only:
return
@ -1367,10 +1468,15 @@ class TrustDomainJoins(object):
self.local_domain.ftinfo_records = []
realm_domains = self.api.Command.realmdomains_show()['result']
# Use realmdomains' modification timestamp to judge records last update time
entry = self.api.Backend.ldap2.get_entry(realm_domains['dn'], ['modifyTimestamp'])
# Use realmdomains' modification timestamp
# to judge records' last update time
entry = self.api.Backend.ldap2.get_entry(
realm_domains['dn'], ['modifyTimestamp'])
# Convert the timestamp to Windows 64-bit timestamp format
trust_timestamp = long(time.mktime(entry['modifytimestamp'][0].timetuple())*1e7+116444736000000000)
trust_timestamp = long(
time.mktime(
entry.single_value.get('modifytimestamp').timetuple()
)*1e7+116444736000000000)
for dom in realm_domains['associateddomain']:
ftinfo = dict()
@ -1379,7 +1485,8 @@ class TrustDomainJoins(object):
ftinfo['rec_type'] = lsa.LSA_FOREST_TRUST_TOP_LEVEL_NAME
self.local_domain.ftinfo_records.append(ftinfo)
def join_ad_full_credentials(self, realm, realm_server, realm_admin, realm_passwd, trust_type):
def join_ad_full_credentials(self, realm, realm_server, realm_admin,
realm_passwd, trust_type):
if not self.configured:
return None
@ -1392,24 +1499,33 @@ class TrustDomainJoins(object):
)
trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL)
if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']:
if self.remote_domain.info['dns_domain'] != \
self.remote_domain.info['dns_forest']:
if not trust_external:
raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'],
domain=self.remote_domain.info['dns_domain'])
raise errors.NotAForestRootError(
forest=self.remote_domain.info['dns_forest'],
domain=self.remote_domain.info['dns_domain'])
if not self.remote_domain.read_only:
trustdom_pass = samba.generate_random_password(128, 128)
self.get_realmdomains()
self.remote_domain.establish_trust(self.local_domain,
trustdom_pass, trust_type, trust_external)
trustdom_pass,
trust_type, trust_external)
self.local_domain.establish_trust(self.remote_domain,
trustdom_pass, trust_type, trust_external)
# if trust is inbound, we don't need to verify it because AD DC will respond
# with WERR_NO_SUCH_DOMAIN -- in only does verification for outbound trusts.
trustdom_pass,
trust_type, trust_external)
# if trust is inbound, we don't need to verify it because
# AD DC will respond with WERR_NO_SUCH_DOMAIN --
# it only does verification for outbound trusts.
result = True
if trust_type == TRUST_BIDIRECTIONAL:
result = self.remote_domain.verify_trust(self.local_domain)
return dict(local=self.local_domain, remote=self.remote_domain, verified=result)
return dict(
local=self.local_domain,
remote=self.remote_domain,
verified=result
)
return None
def join_ad_ipa_half(self, realm, realm_server, trustdom_passwd, trust_type):
@ -1420,11 +1536,18 @@ class TrustDomainJoins(object):
self.populate_remote_domain(realm, realm_server, realm_passwd=None)
trust_external = bool(self.__allow_behavior & TRUST_JOIN_EXTERNAL)
if self.remote_domain.info['dns_domain'] != self.remote_domain.info['dns_forest']:
if self.remote_domain.info['dns_domain'] != \
self.remote_domain.info['dns_forest']:
if not trust_external:
raise errors.NotAForestRootError(forest=self.remote_domain.info['dns_forest'],
domain=self.remote_domain.info['dns_domain'])
raise errors.NotAForestRootError(
forest=self.remote_domain.info['dns_forest'],
domain=self.remote_domain.info['dns_domain'])
self.local_domain.establish_trust(self.remote_domain,
trustdom_passwd, trust_type, trust_external)
return dict(local=self.local_domain, remote=self.remote_domain, verified=False)
trustdom_passwd,
trust_type, trust_external)
return dict(
local=self.local_domain,
remote=self.remote_domain,
verified=False
)