dcerpc: refactor assess_dcerpc_exception

assess_dcerpc_exception was used in multiple places with a pre-step
which was rather common. Move this to one spot.

This also fixes pylint warning about unbalanced unpacking.

https://pagure.io/freeipa/issue/6874

Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com>
This commit is contained in:
Stanislav Laznicka 2017-08-22 13:19:05 +02:00
parent b57f87c9a0
commit 216d37b7f0

View File

@ -162,11 +162,20 @@ class TrustTopologyConflictSolved(Exception):
"""
pass
def assess_dcerpc_exception(num=None, message=None):
def assess_dcerpc_error(error):
"""
Takes error returned by Samba bindings and converts it into
an IPA error class.
"""
if isinstance(error, RuntimeError):
error_tuple = error.args
else:
error_tuple = error
if len(error_tuple) != 2:
raise RuntimeError("Unable to parse error: {err!r}".format(err=error))
num, message = error_tuple
if num and num in dcerpc_error_codes:
return dcerpc_error_codes[num]
if message and message in dcerpc_error_messages:
@ -812,8 +821,7 @@ class DomainValidator(object):
# Both methods should not fail at the same time
if finddc_error and len(info['gc']) == 0:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(finddc_error)
self._info[domain] = info
return info
@ -848,8 +856,7 @@ class TrustDomainInstance(object):
result = lsa.lsarpc(binding, self.parm, self.creds)
return result
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
def init_lsa_pipe(self, remote_host):
"""
@ -921,8 +928,7 @@ class TrustDomainInstance(object):
else:
result = netrc.finddc(address=remote_host, flags=flags)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
if not result:
return False
@ -983,8 +989,7 @@ class TrustDomainInstance(object):
result = self._pipe.QueryInfoPolicy2(self._policy_handle,
lsa.LSA_POLICY_INFO_DNS)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
self.info['name'] = unicode(result.name.string)
self.info['dns_domain'] = unicode(result.dns_domain.string)
@ -997,8 +1002,7 @@ class TrustDomainInstance(object):
result = self._pipe.QueryInfoPolicy2(self._policy_handle,
lsa.LSA_POLICY_INFO_ROLE)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
self.info['is_pdc'] = (result.role == lsa.LSA_ROLE_PRIMARY)
@ -1282,8 +1286,7 @@ class TrustDomainInstance(object):
info, self.auth_info,
security.SEC_STD_DELETE)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
# We should use proper trustdom handle in order to modify the
# trust settings. Samba insists this has to be done with LSA
@ -1349,8 +1352,7 @@ class TrustDomainInstance(object):
data=data)
return result
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
result = retrieve_netlogon_info_2(None, self,
netlogon.NETLOGON_CONTROL_TC_VERIFY,
@ -1391,7 +1393,7 @@ class TrustDomainInstance(object):
raise errors.ACIError(info=error_message)
raise assess_dcerpc_exception(*result.pdc_connection_status)
raise assess_dcerpc_error(result.pdc_connection_status)
return True
@ -1430,8 +1432,7 @@ def fetch_domains(api, mydomain, trustdomain, creds=None, server=None):
result = netrc.finddc(domain=trustdomain,
flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_DS)
except RuntimeError as e:
num, message = e.args # pylint: disable=unpacking-non-sequence
raise assess_dcerpc_exception(num=num, message=message)
raise assess_dcerpc_error(e)
td.info['dc'] = unicode(result.pdc_dns_name)
td.info['name'] = unicode(result.dns_domain)