pylint: Fix used-before-assignment

> Emitted when a local variable is accessed before its assignment took
place. Assignments in try blocks are assumed not to have occurred when
evaluating associated except/finally blocks. Assignments in except
blocks are assumed not to have occurred when evaluating statements
outside the block, except when the associated try block contains a
return statement.

Fixes: https://pagure.io/freeipa/issue/9278
Signed-off-by: Stanislav Levin <slev@altlinux.org>
Reviewed-By: Stanislav Levin <slev@altlinux.org>
This commit is contained in:
Stanislav Levin 2022-12-16 19:23:32 +03:00 committed by Florence Blanc-Renaud
parent a8dd070992
commit 0e03315299
9 changed files with 39 additions and 46 deletions

View File

@ -39,8 +39,8 @@ logger.setLevel(logging.DEBUG)
@contextmanager @contextmanager
def use_api_as_principal(principal, keytab): def use_api_as_principal(principal, keytab):
with ipautil.private_ccache() as ccache_file: with ipautil.private_ccache() as ccache_file:
old_principal = getattr(context, "principal", None)
try: try:
old_principal = getattr(context, "principal", None)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal) name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {"ccache": ccache_file, "client_keytab": keytab} store = {"ccache": ccache_file, "client_keytab": keytab}
gssapi.Credentials(name=name, usage="initiate", store=store) gssapi.Credentials(name=name, usage="initiate", store=store)
@ -63,9 +63,7 @@ def use_api_as_principal(principal, keytab):
finally: finally:
if api.Backend.rpcclient.isconnected(): if api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.disconnect() api.Backend.rpcclient.disconnect()
# pylint: disable=used-before-assignment
setattr(context, "principal", old_principal) setattr(context, "principal", old_principal)
# pylint: enable=used-before-assignment
def parse_options(): def parse_options():

View File

@ -1082,6 +1082,7 @@ class LDAPClient:
def error_handler(self, arg_desc=None): def error_handler(self, arg_desc=None):
"""Context manager that handles LDAPErrors """Context manager that handles LDAPErrors
""" """
desc = None
try: try:
try: try:
yield yield
@ -1101,7 +1102,7 @@ class LDAPClient:
except ldap.TYPE_OR_VALUE_EXISTS: except ldap.TYPE_OR_VALUE_EXISTS:
# attribute type or attribute value already exists, usually only # attribute type or attribute value already exists, usually only
# occurs, when two machines try to write at the same time. # occurs, when two machines try to write at the same time.
raise errors.DuplicateEntry(message=desc) # pylint: disable=E0601 raise errors.DuplicateEntry(message=desc)
except ldap.CONSTRAINT_VIOLATION: except ldap.CONSTRAINT_VIOLATION:
# This error gets thrown by the uniqueness plugin # This error gets thrown by the uniqueness plugin
_msg = 'Another entry with the same attribute value already exists' _msg = 'Another entry with the same attribute value already exists'

View File

@ -388,13 +388,14 @@ class ADTRUSTInstance(service.Service):
# Abort if RID bases are too close # Abort if RID bases are too close
local_range = ranges_with_no_rid_base[0] local_range = ranges_with_no_rid_base[0]
size_value = local_range.single_value.get('ipaIDRangeSize')
try: try:
size = int(local_range.single_value.get('ipaIDRangeSize')) size = int(size_value)
except ValueError: except (ValueError, TypeError):
# pylint: disable=used-before-assignment raise RuntimeError(
raise RuntimeError('ipaIDRangeSize is set to a non-integer ' "ipaIDRangeSize is set to a non-integer value or is not set"
'value or is not set at all (got {val})' f" at all (got {size_value!r})"
.format(val=size)) ) from None
if abs(self.rid_base - self.secondary_rid_base) < size: if abs(self.rid_base - self.secondary_rid_base) < size:
self.print_msg("Primary and secondary RID base are too close. " self.print_msg("Primary and secondary RID base are too close. "

View File

@ -1559,10 +1559,8 @@ def default_ca_subject_dn(subject_base):
def validate_mask(): def validate_mask():
try: mask = os.umask(0)
mask = os.umask(0) os.umask(mask)
finally:
os.umask(mask) # pylint: disable=used-before-assignment
mask_str = None mask_str = None
if mask & 0b111101101 > 0: if mask & 0b111101101 > 0:
mask_str = "{:04o}".format(mask) mask_str = "{:04o}".format(mask)

View File

@ -436,8 +436,8 @@ class KrbInstance(service.Service):
krbtgt = "krbtgt/" + self.realm + "@" + self.realm krbtgt = "krbtgt/" + self.realm + "@" + self.realm
certpath = (paths.KDC_CERT, paths.KDC_KEY) certpath = (paths.KDC_CERT, paths.KDC_KEY)
prev_helper = None
try: try:
prev_helper = None
# on the first CA-ful master without '--no-pkinit', we issue the # on the first CA-ful master without '--no-pkinit', we issue the
# certificate by contacting Dogtag directly # certificate by contacting Dogtag directly
ca_instances = find_providing_servers( ca_instances = find_providing_servers(
@ -482,7 +482,6 @@ class KrbInstance(service.Service):
logger.error("Failed to initiate the request: %s", e) logger.error("Failed to initiate the request: %s", e)
return return
finally: finally:
# pylint: disable-next=used-before-assignment
if prev_helper is not None: if prev_helper is not None:
certmonger.modify_ca_helper(certmonger_ca, prev_helper) certmonger.modify_ca_helper(certmonger_ca, prev_helper)

View File

@ -44,8 +44,8 @@ from contextlib import contextmanager
@contextmanager @contextmanager
def use_keytab(principal, keytab): def use_keytab(principal, keytab):
with private_ccache() as ccache_file: with private_ccache() as ccache_file:
old_principal = getattr(context, 'principal', None)
try: try:
old_principal = getattr(context, 'principal', None)
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal) name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_file, store = {'ccache': ccache_file,
'client_keytab': keytab} 'client_keytab': keytab}
@ -60,7 +60,6 @@ def use_keytab(principal, keytab):
'principal %s in %s: %s' % (principal, keytab, 'principal %s in %s: %s' % (principal, keytab,
str(e))) str(e)))
finally: finally:
# pylint: disable-next=used-before-assignment
setattr(context, 'principal', old_principal) setattr(context, 'principal', old_principal)

View File

@ -657,14 +657,15 @@ class TestTrust(BaseTestTrust):
try: try:
# stop sssd_be, needed to simulate a timeout in the extdom plugin. # stop sssd_be, needed to simulate a timeout in the extdom plugin.
stop_sssdbe = self.master.run_command('kill -STOP %s' % pid) self.master.run_command('kill -STOP %s' % pid)
client.run_command(test_id) try:
error = 'ldap_extended_operation result: No such object(32)' client.run_command(test_id)
sssd_log2 = client.get_file_contents(log_file)[logsize:] error = 'ldap_extended_operation result: No such object(32)'
assert error.encode() not in sssd_log2 sssd_log2 = client.get_file_contents(log_file)[logsize:]
finally: assert error.encode() not in sssd_log2
if stop_sssdbe.returncode == 0: # pylint: disable=E0601 finally:
self.master.run_command('kill -CONT %s' % pid) self.master.run_command('kill -CONT %s' % pid)
finally:
# reconnect and set back to default extdom plugin # reconnect and set back to default extdom plugin
conn = self.master.ldap_connect() conn = self.master.ldap_connect()
entry = conn.get_entry(extdom_dn) entry = conn.get_entry(extdom_dn)

View File

@ -49,7 +49,7 @@ class CAACLTracker(Tracker):
u'usercategory', u'hostcategory', u'ipacacategory', u'usercategory', u'hostcategory', u'ipacacategory',
u'servicecategory', u'ipaenabledflag', u'objectclass', u'servicecategory', u'ipaenabledflag', u'objectclass',
u'ipauniqueid'} u'ipauniqueid'}
update_keys = create_keys - {u'dn'} update_keys = retrieve_keys - {"dn"}
def __init__(self, name, ipacertprofile_category=None, user_category=None, def __init__(self, name, ipacertprofile_category=None, user_category=None,
service_category=None, host_category=None, service_category=None, host_category=None,
@ -158,7 +158,7 @@ class CAACLTracker(Tracker):
def make_update_command(self, updates): def make_update_command(self, updates):
return self.make_command('caacl_mod', self.name, **updates) return self.make_command('caacl_mod', self.name, **updates)
def update(self, updates, expected_updates=None, silent=False): def update(self, updates, expected_updates=None):
"""If removing a category, delete it from tracker as well""" """If removing a category, delete it from tracker as well"""
# filter out empty categories and track changes # filter out empty categories and track changes
@ -166,11 +166,7 @@ class CAACLTracker(Tracker):
for key, value in updates.items(): for key, value in updates.items():
if key in self.category_keys: if key in self.category_keys:
if not value: if not value:
try: del self.attrs[key]
del self.attrs[key]
except IndexError:
if silent:
pass
else: else:
# if there is a value, prepare the pair for update # if there is a value, prepare the pair for update
filtered_updates.update({key: value}) filtered_updates.update({key: value})
@ -185,14 +181,13 @@ class CAACLTracker(Tracker):
try: try:
result = command() result = command()
except errors.EmptyModlist: except errors.EmptyModlist:
if silent: pass
self.attrs.update(filtered_updates) else:
self.attrs.update(expected_updates) self.attrs.update(filtered_updates)
# pylint: disable=used-before-assignment self.attrs.update(expected_updates)
self.check_update(result, self.check_update(result,
extra_keys=set(self.update_keys) | extra_keys=set(self.update_keys) |
set(expected_updates.keys())) set(expected_updates.keys()))
# pylint: enable=used-before-assignment
def check_update(self, result, extra_keys=()): def check_update(self, result, extra_keys=()):
assert_deepequal(dict( assert_deepequal(dict(

View File

@ -136,13 +136,13 @@ def fuzzy_set_optional_oc(s, oc):
== set(y.lower() for y in s if y != oc)) == set(y.lower() for y in s if y != oc))
server_available = None
try: try:
if not api.Backend.rpcclient.isconnected(): if not api.Backend.rpcclient.isconnected():
api.Backend.rpcclient.connect() api.Backend.rpcclient.connect()
res = api.Command['user_show'](u'notfound') api.Command["user_show"]("notfound")
except errors.NetworkError: server_available = True
server_available = False except (errors.NetworkError, IOError):
except IOError:
server_available = False server_available = False
except errors.NotFound: except errors.NotFound:
server_available = True server_available = True
@ -202,7 +202,7 @@ class XMLRPC_test:
""" """
@pytest.fixture(autouse=True, scope="class") @pytest.fixture(autouse=True, scope="class")
def xmlrpc_setup(self, request): def xmlrpc_setup(self, request):
if not server_available: # pylint: disable=used-before-assignment if not server_available:
pytest.skip('%r: Server not available: %r' % pytest.skip('%r: Server not available: %r' %
(request.cls.__module__, (request.cls.__module__,
api.env.xmlrpc_uri)) api.env.xmlrpc_uri))
@ -345,6 +345,7 @@ class Declarative(XMLRPC_test):
def check_exception(self, nice, cmd, args, options, expected): def check_exception(self, nice, cmd, args, options, expected):
klass = expected.__class__ klass = expected.__class__
expected_name = klass.__name__ expected_name = klass.__name__
got = None
try: try:
output = api.Command[cmd](*args, **options) output = api.Command[cmd](*args, **options)
except Exception as e: except Exception as e:
@ -353,7 +354,7 @@ class Declarative(XMLRPC_test):
raise AssertionError( raise AssertionError(
EXPECTED % (cmd, expected_name, args, options, output) EXPECTED % (cmd, expected_name, args, options, output)
) )
if not isinstance(got, klass): # pylint: disable=used-before-assignment if not isinstance(got, klass):
raise AssertionError( raise AssertionError(
UNEXPECTED % (cmd, expected_name, args, options, UNEXPECTED % (cmd, expected_name, args, options,
expected_name, expected, expected_name, expected,