LGTM: Fix multiple use before assignment

- Move assignment before try/finally block
- Add raise to indicate control flow change
- Add default value

https://pagure.io/freeipa/issue/7344

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Fraser Tweedale <ftweedal@redhat.com>
This commit is contained in:
Christian Heimes 2018-01-03 11:09:41 +01:00
parent dc599e0797
commit 73ee9ff40e
9 changed files with 45 additions and 25 deletions

View File

@ -191,14 +191,16 @@ class IPACustodiaTester(object):
pkey = JWK(**dictkeys[usage_id])
local_pubkey = json_decode(pkey.export_public())
except Exception:
self.error("Failed to load and parse local JWK.", fatal=True)
raise self.error(
"Failed to load and parse local JWK.", fatal=True
)
else:
self.info("Loaded key for usage '{}' from '{}'.".format(
usage, IPA_CUSTODIA_KEYFILE
))
if pkey.key_id != self.host_spn:
self.error(
raise self.error(
"KID '{}' != host service principal name '{}' "
"(usage: {})".format(pkey.key_id, self.host_spn, usage),
fatal=True
@ -215,8 +217,11 @@ class IPACustodiaTester(object):
try:
host_pubkey = json_decode(find_key(self.host_spn, usage_id))
except Exception:
self.error("Fetching host keys {} (usage: {}) failed.".format(
self.host_spn, usage), fatal=True)
raise self.error(
"Fetching host keys {} (usage: {}) failed.".format(
self.host_spn, usage),
fatal=True
)
else:
self.info("Checked host LDAP keys '{}' for usage {}.".format(
self.host_spn, usage
@ -225,8 +230,10 @@ class IPACustodiaTester(object):
if host_pubkey != local_pubkey:
self.debug("LDAP: '{}'".format(host_pubkey))
self.debug("Local: '{}'".format(local_pubkey))
self.error(
"Host key in LDAP does not match local key.", fatal=True)
raise self.error(
"Host key in LDAP does not match local key.",
fatal=True
)
else:
self.info(
"Local key for usage '{}' matches key in LDAP.".format(usage)
@ -235,8 +242,11 @@ class IPACustodiaTester(object):
try:
server_pubkey = json_decode(find_key(self.server_spn, usage_id))
except Exception:
self.error("Fetching server keys {} (usage: {}) failed.".format(
self.server_spn, usage), fatal=True)
raise self.error(
"Fetching server keys {} (usage: {}) failed.".format(
self.server_spn, usage),
fatal=True
)
else:
self.info("Checked server LDAP keys '{}' for usage {}.".format(
self.server_spn, usage

View File

@ -1275,9 +1275,11 @@ def update_dns(server, hostname, options):
ips = get_local_ipaddresses()
except CalledProcessError as e:
logger.error("Cannot update DNS records. %s", e)
logger.debug("Unable to get local IP addresses.")
ips = None
if options.all_ip_addresses:
if ips is None:
raise RuntimeError("Unable to get local IP addresses.")
update_ips = ips
elif options.ip_addresses:
update_ips = []
@ -1777,8 +1779,8 @@ def get_ca_certs(fstore, options, server, basedn, realm):
override)
else:
# Auth with user credentials
url = ldap_url()
try:
url = ldap_url()
ca_certs = get_ca_certs_from_ldap(server, basedn, realm)
validate_new_ca_certs(existing_ca_certs, ca_certs, interactive)
except errors.FileError as e:
@ -1821,7 +1823,7 @@ def get_ca_certs(fstore, options, server, basedn, realm):
if ca_certs is None and existing_ca_certs is None:
raise errors.InternalError(u"expected CA cert file '%s' to "
u"exist, but it's absent" % (ca_file))
u"exist, but it's absent" % ca_file)
if ca_certs is not None:
try:
@ -2427,9 +2429,10 @@ def _install(options):
if not options.on_master:
nolog = tuple()
# First test out the kerberos configuration
fd, krb_name = tempfile.mkstemp()
os.close(fd)
ccache_dir = tempfile.mkdtemp(prefix='krbcc')
try:
(krb_fd, krb_name) = tempfile.mkstemp()
os.close(krb_fd)
configure_krb5_conf(
cli_realm=cli_realm,
cli_domain=cli_domain,
@ -2442,7 +2445,6 @@ def _install(options):
configure_sssd=options.sssd,
force=options.force)
env['KRB5_CONFIG'] = krb_name
ccache_dir = tempfile.mkdtemp(prefix='krbcc')
ccache_name = os.path.join(ccache_dir, 'ccache')
join_args = [paths.SBIN_IPA_JOIN,
"-s", cli_server[0],
@ -2799,7 +2801,7 @@ def _install(options):
nscd = services.knownservices.nscd
if nscd.is_installed():
save_state(nscd, statestore)
nscd_service_action = None
try:
if options.sssd:
nscd_service_action = 'stop'

View File

@ -123,14 +123,12 @@ class CACertManage(admintool.AdminTool):
try:
if command == 'renew':
rc = self.renew()
return self.renew()
elif command == 'install':
rc = self.install()
return self.install()
finally:
api.Backend.ldap2.disconnect()
return rc
def ldap_connect(self):
password = self.options.password
if not password:

View File

@ -197,6 +197,8 @@ class ReplicaPrepare(admintool.AdminTool):
def ask_for_options(self):
options = self.options
super(ReplicaPrepare, self).ask_for_options()
http_ca_cert = None
dirsrv_ca_cert = None
# get the directory manager password
self.dirman_password = options.password

View File

@ -306,6 +306,8 @@ def install_check(installer):
external_cert_file = installer._external_cert_file
external_ca_file = installer._external_ca_file
http_ca_cert = installer._ca_cert
dirsrv_ca_cert = None
pkinit_ca_cert = None
tasks.check_ipv6_stack_enabled()
tasks.check_selinux_status()

View File

@ -1021,10 +1021,13 @@ def promote_check(installer):
http_pkcs12_file = None
http_pkcs12_info = None
http_ca_cert = None
dirsrv_pkcs12_file = None
dirsrv_pkcs12_info = None
dirsrv_ca_cert = None
pkinit_pkcs12_file = None
pkinit_pkcs12_info = None
pkinit_ca_cert = None
if options.http_cert_files:
if options.http_pin is None:

View File

@ -1808,7 +1808,8 @@ class trustdomain_enable(LDAPQuery):
trust_dn = self.obj.get_dn(keys[0], trust_type=u'ad')
trust_entry = ldap.get_entry(trust_dn)
except errors.NotFound:
self.api.Object[self.obj.parent_object].handle_not_found(keys[0])
self.api.Object[self.obj.parent_object].handle_not_found(
keys[0])
dn = self.obj.get_dn(keys[0], keys[1], trust_type=u'ad')
try:
@ -1849,13 +1850,14 @@ class trustdomain_disable(LDAPQuery):
trust_dn = self.obj.get_dn(keys[0], trust_type=u'ad')
trust_entry = ldap.get_entry(trust_dn)
except errors.NotFound:
self.api.Object[self.obj.parent_object].handle_not_found(keys[0])
self.api.Object[self.obj.parent_object].handle_not_found(
keys[0])
dn = self.obj.get_dn(keys[0], keys[1], trust_type=u'ad')
try:
entry = ldap.get_entry(dn)
sid = entry.single_value.get('ipanttrusteddomainsid', None)
if not (sid in trust_entry['ipantsidblacklistincoming']):
if sid not in trust_entry['ipantsidblacklistincoming']:
trust_entry['ipantsidblacklistincoming'].append(sid)
ldap.update_entry(trust_entry)
else:

View File

@ -204,9 +204,9 @@ class PEMFileHandler(DBMAPHandler):
v = json_decode(value)
data = b64decode(v['pkcs12 data'])
password = v['export password']
fd, tmpdata = tempfile.mkstemp(dir=paths.TMP)
os.close(fd)
try:
_fd, tmpdata = tempfile.mkstemp(dir=paths.TMP)
os.close(_fd)
with open(tmpdata, 'wb') as f:
f.write(data)

View File

@ -600,8 +600,9 @@ def modify_sssd_conf(host, domain, mod_dict, provider='ipa',
:param provider_subtype: backend subtype (e.g. id or sudo), will be added
to the domain config if not present
"""
fd, temp_config_file = tempfile.mkstemp()
os.close(fd)
try:
temp_config_file = tempfile.mkstemp()[1]
current_config = host.transport.get_file_contents(paths.SSSD_CONF)
with open(temp_config_file, 'wb') as f: