Fixed cognitive complexity reported by SonarQube.

This commit is contained in:
Nikhil Mohite 2021-01-19 13:34:14 +05:30 committed by Akshay Joshi
parent 94d2ad9584
commit da9f9017a9
3 changed files with 241 additions and 189 deletions

View File

@ -131,22 +131,9 @@ class SchemaDiffTableCompare(SchemaDiffObjectCompare):
for source in source_cols:
if 'name' in source:
if isinstance(target_cols, list) and target_cols:
tmp = None
for item in target_cols:
if item['name'] == source['name']:
tmp = copy.deepcopy(item)
if tmp and source != tmp:
tmp_updated = copy.deepcopy(source)
# Preserve the column number
tmp_updated['attnum'] = tmp['attnum']
if item['typname'] not in tmp_updated['edit_types']:
tmp_updated['col_type_conversion'] = False
updated.append(tmp_updated)
target_cols.remove(tmp)
elif tmp and source == tmp:
target_cols.remove(tmp)
elif tmp is None:
added.append(source)
SchemaDiffTableCompare.compare_target_cols(source,
target_cols,
added, updated)
else:
added.append(source)
different['columns']['added'] = added
@ -157,6 +144,33 @@ class SchemaDiffTableCompare(SchemaDiffObjectCompare):
return different
@staticmethod
def compare_target_cols(source, target_cols, added, updated):
"""
Compare target col with source.
:param source:
:param target_cols:
:param added:
:param updated:
:return:
"""
tmp = None
for item in target_cols:
if item['name'] == source['name']:
tmp = copy.deepcopy(item)
if tmp and source != tmp:
tmp_updated = copy.deepcopy(source)
# Preserve the column number
tmp_updated['attnum'] = tmp['attnum']
if item['typname'] not in tmp_updated['edit_types']:
tmp_updated['col_type_conversion'] = False
updated.append(tmp_updated)
target_cols.remove(tmp)
elif tmp and source == tmp:
target_cols.remove(tmp)
elif tmp is None:
added.append(source)
@staticmethod
def table_constraint_comp(source_table, target_table):
"""

View File

@ -231,20 +231,11 @@ class BatchProcess(object):
db.session.add(j)
db.session.commit()
def start(self, cb=None):
def which(program, paths):
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
for path in paths:
if not os.path.isdir(path):
continue
exe_file = os.path.join(u_encode(path, fs_encoding), program)
if is_exe(exe_file):
return file_quote(exe_file)
return None
def check_start_end_time(self):
"""
Check start and end time to check process is still executing or not.
:return:
"""
if self.stime is not None:
if self.etime is None:
raise RuntimeError(_('The process has already been started.'))
@ -252,74 +243,20 @@ class BatchProcess(object):
_('The process has already finished and cannot be restarted.')
)
def start(self, cb=None):
self.check_start_end_time()
executor = file_quote(os.path.join(
os.path.dirname(u_encode(__file__)), 'process_executor.py'
))
paths = os.environ['PATH'].split(os.pathsep)
interpreter = None
current_app.logger.info(
"Process Executor: Operating System Path %s",
str(paths)
)
if os.name == 'nt':
paths.insert(0, os.path.join(u_encode(sys.prefix), 'Scripts'))
paths.insert(0, u_encode(sys.prefix))
interpreter = which('pythonw.exe', paths)
if interpreter is None:
interpreter = which('python.exe', paths)
current_app.logger.info(
"Process Executor: Interpreter value in path: %s",
str(interpreter)
)
if interpreter is None and current_app.PGADMIN_RUNTIME:
# We've faced an issue with Windows 2008 R2 (x86) regarding,
# not honouring the environment variables set under the Qt
# (e.g. runtime), and also setting PYTHONHOME same as
# sys.executable (i.e. pgAdmin4.exe).
#
# As we know, we're running it under the runtime, we can assume
# that 'venv' directory will be available outside of 'bin'
# directory.
#
# We would try out luck to find python executable based on that
# assumptions.
bin_path = os.path.dirname(sys.executable)
venv = os.path.realpath(
os.path.join(bin_path, '..\\venv')
)
interpreter = which('pythonw.exe', [venv])
if interpreter is None:
interpreter = which('python.exe', [venv])
current_app.logger.info(
"Process Executor: Interpreter value in virtual "
"environment: %s", str(interpreter)
)
if interpreter is not None:
# Our assumptions are proven right.
# Let's append the 'bin' directory to the PATH environment
# variable. And, also set PYTHONHOME environment variable
# to 'venv' directory.
os.environ['PATH'] = bin_path + ';' + os.environ['PATH']
os.environ['PYTHONHOME'] = venv
else:
# Let's not use sys.prefix in runtime.
# 'sys.prefix' is not identified on *nix systems for some unknown
# reason, while running under the runtime.
# We're already adding '<installation path>/pgAdmin 4/venv/bin'
# directory in the PATH environment variable. Hence - it will
# anyway be the redundant value in paths.
if not current_app.PGADMIN_RUNTIME:
paths.insert(0, os.path.join(u_encode(sys.prefix), 'bin'))
python_binary_name = 'python{0}'.format(sys.version_info[0])
interpreter = which(u_encode(python_binary_name), paths)
interpreter = self.get_interpreter(paths)
p = None
cmd = [
@ -368,34 +305,14 @@ class BatchProcess(object):
creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS)
)
else:
def preexec_function():
import signal
# Detaching from the parent process group
os.setpgrp()
# Explicitly ignoring signals in the child process
signal.signal(signal.SIGINT, signal.SIG_IGN)
# if in debug mode, wait for process to complete and
# get the stdout and stderr of popen.
if config.CONSOLE_LOG_LEVEL <= logging.DEBUG:
p = Popen(
cmd, close_fds=True, stdout=PIPE, stderr=PIPE, stdin=None,
preexec_fn=preexec_function, env=env
)
output, errors = p.communicate()
output = output.decode() \
if hasattr(output, 'decode') else output
errors = errors.decode() \
if hasattr(errors, 'decode') else errors
current_app.logger.debug(
'Process Watcher Out:{0}'.format(output))
current_app.logger.debug(
'Process Watcher Err:{0}'.format(errors))
output, error = self.get_process_output(cmd, env)
else:
p = Popen(
cmd, close_fds=True, stdout=None, stderr=None, stdin=None,
preexec_fn=preexec_function, env=env
preexec_fn=self.preexec_function, env=env
)
self.ecode = p.poll()
@ -423,60 +340,167 @@ class BatchProcess(object):
p.process_state = PROCESS_STARTED
db.session.commit()
def status(self, out=0, err=0):
import re
def get_process_output(self, cmd, env):
"""
:param cmd:
:param env:
:return:
"""
p = Popen(
cmd, close_fds=True, stdout=PIPE, stderr=PIPE, stdin=None,
preexec_fn=self.preexec_function, env=env
)
output, errors = p.communicate()
output = output.decode() \
if hasattr(output, 'decode') else output
errors = errors.decode() \
if hasattr(errors, 'decode') else errors
current_app.logger.debug(
'Process Watcher Out:{0}'.format(output))
current_app.logger.debug(
'Process Watcher Err:{0}'.format(errors))
return output, errors
def preexec_function(self):
import signal
# Detaching from the parent process group
os.setpgrp()
# Explicitly ignoring signals in the child process
signal.signal(signal.SIGINT, signal.SIG_IGN)
def get_interpreter(self, paths):
"""
Get interpreter.
:param paths:
:return:
"""
if os.name == 'nt':
paths.insert(0, os.path.join(u_encode(sys.prefix), 'Scripts'))
paths.insert(0, u_encode(sys.prefix))
interpreter = self.which('pythonw.exe', paths)
if interpreter is None:
interpreter = self.which('python.exe', paths)
current_app.logger.info(
"Process Executor: Interpreter value in path: %s",
str(interpreter)
)
if interpreter is None and current_app.PGADMIN_RUNTIME:
# We've faced an issue with Windows 2008 R2 (x86) regarding,
# not honouring the environment variables set under the Qt
# (e.g. runtime), and also setting PYTHONHOME same as
# sys.executable (i.e. pgAdmin4.exe).
#
# As we know, we're running it under the runtime, we can assume
# that 'venv' directory will be available outside of 'bin'
# directory.
#
# We would try out luck to find python executable based on that
# assumptions.
bin_path = os.path.dirname(sys.executable)
venv = os.path.realpath(
os.path.join(bin_path, '..\\venv')
)
interpreter = self.which('pythonw.exe', [venv])
if interpreter is None:
interpreter = self.which('python.exe', [venv])
current_app.logger.info(
"Process Executor: Interpreter value in virtual "
"environment: %s", str(interpreter)
)
if interpreter is not None:
# Our assumptions are proven right.
# Let's append the 'bin' directory to the PATH environment
# variable. And, also set PYTHONHOME environment variable
# to 'venv' directory.
os.environ['PATH'] = bin_path + ';' + os.environ['PATH']
os.environ['PYTHONHOME'] = venv
else:
# Let's not use sys.prefix in runtime.
# 'sys.prefix' is not identified on *nix systems for some unknown
# reason, while running under the runtime.
# We're already adding '<installation path>/pgAdmin 4/venv/bin'
# directory in the PATH environment variable. Hence - it will
# anyway be the redundant value in paths.
if not current_app.PGADMIN_RUNTIME:
paths.insert(0, os.path.join(u_encode(sys.prefix), 'bin'))
python_binary_name = 'python{0}'.format(sys.version_info[0])
interpreter = self.which(u_encode(python_binary_name), paths)
return interpreter
def which(self, program, paths):
def is_exe(fpath):
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
for path in paths:
if not os.path.isdir(path):
continue
exe_file = os.path.join(u_encode(path, fs_encoding), program)
if is_exe(exe_file):
return file_quote(exe_file)
return None
def read_log(self, logfile, log, pos, ctime, ecode=None, enc='utf-8'):
import re
completed = True
idx = 0
c = re.compile(r"(\d+),(.*$)")
if not os.path.isfile(logfile):
return 0, False
with open(logfile, 'rb') as f:
eofs = os.fstat(f.fileno()).st_size
f.seek(pos, 0)
if pos == eofs and ecode is None:
completed = False
while pos < eofs:
idx += 1
line = f.readline()
line = line.decode(enc, 'replace')
r = c.split(line)
if len(r) < 3:
# ignore this line
pos = f.tell()
continue
if r[1] > ctime:
completed = False
break
log.append([r[1], r[2]])
pos = f.tell()
if idx >= 1024:
completed = False
break
if pos == eofs:
if ecode is None:
completed = False
break
return pos, completed
def status(self, out=0, err=0):
ctime = get_current_time(format='%y%m%d%H%M%S%f')
stdout = []
stderr = []
out_completed = err_completed = False
process_output = (out != -1 and err != -1)
enc = sys.getdefaultencoding()
if enc == 'ascii':
enc = 'utf-8'
def read_log(logfile, log, pos, ctime, ecode=None):
completed = True
idx = 0
c = re.compile(r"(\d+),(.*$)")
if not os.path.isfile(logfile):
return 0, False
with open(logfile, 'rb') as f:
eofs = os.fstat(f.fileno()).st_size
f.seek(pos, 0)
if pos == eofs and ecode is None:
completed = False
while pos < eofs:
idx += 1
line = f.readline()
line = line.decode(enc, 'replace')
r = c.split(line)
if len(r) < 3:
# ignore this line
pos = f.tell()
continue
if r[1] > ctime:
completed = False
break
log.append([r[1], r[2]])
pos = f.tell()
if idx >= 1024:
completed = False
break
if pos == eofs:
if ecode is None:
completed = False
break
return pos, completed
j = Process.query.filter_by(
pid=self.id, user_id=current_user.id
).first()
enc = sys.getdefaultencoding()
if enc == 'ascii':
enc = 'utf-8'
execution_time = None
@ -495,11 +519,11 @@ class BatchProcess(object):
execution_time = BatchProcess.total_seconds(etime - stime)
if process_output:
out, out_completed = read_log(
self.stdout, stdout, out, ctime, self.ecode
out, out_completed = self.read_log(
self.stdout, stdout, out, ctime, self.ecode, enc
)
err, err_completed = read_log(
self.stderr, stderr, err, ctime, self.ecode
err, err_completed = self.read_log(
self.stderr, stderr, err, ctime, self.ecode, enc
)
else:
out_completed = err_completed = False

View File

@ -418,18 +418,18 @@ def are_lists_identical(source_list, target_list, ignore_keys):
if source_list is None or target_list is None or \
len(source_list) != len(target_list):
return False
else:
for index in range(len(source_list)):
# Check the type of the value if it is an dictionary then
# call are_dictionaries_identical() function.
if isinstance(source_list[index], dict):
if not are_dictionaries_identical(source_list[index],
target_list[index],
ignore_keys):
return False
else:
if source_list[index] != target_list[index]:
return False
for index in range(len(source_list)):
# Check the type of the value if it is an dictionary then
# call are_dictionaries_identical() function.
if isinstance(source_list[index], dict):
if not are_dictionaries_identical(source_list[index],
target_list[index],
ignore_keys):
return False
else:
if source_list[index] != target_list[index]:
return False
return True
@ -459,15 +459,15 @@ def are_dictionaries_identical(source_dict, target_dict, ignore_keys):
current_app.logger.debug("Schema Diff: Number of keys are different "
"in source and target")
return False
else:
# If number of keys are same but key is not present in target then
# return False
for key in src_only:
if key not in tar_only:
current_app.logger.debug(
"Schema Diff: Number of keys are same but key is not"
" present in target")
return False
# If number of keys are same but key is not present in target then
# return False
for key in src_only:
if key not in tar_only:
current_app.logger.debug(
"Schema Diff: Number of keys are same but key is not"
" present in target")
return False
for key in source_dict.keys():
# Continue if key is available in ignore_keys
@ -491,17 +491,9 @@ def are_dictionaries_identical(source_dict, target_dict, ignore_keys):
else:
source_value = source_dict[key]
target_value = target_dict[key]
# If ignore_whitespaces is True then check the source_value and
# target_value if of type string. If the values is of type string
# then using translate function ignore all the whitespaces.
if ignore_whitespaces:
if isinstance(source_value, str):
source_value = source_value.translate(
str.maketrans('', '', string.whitespace))
if isinstance(target_value, str):
target_value = target_value.translate(
str.maketrans('', '', string.whitespace))
# Check if ignore whitespaces or not.
source_value, target_value = check_for_ignore_whitespaces(
ignore_whitespaces, source_value, target_value)
# We need a proper solution as sometimes we observe that
# source_value is '' and target_value is None or vice versa
@ -522,6 +514,28 @@ def are_dictionaries_identical(source_dict, target_dict, ignore_keys):
return True
def check_for_ignore_whitespaces(ignore_whitespaces, source_value,
target_value):
"""
If ignore_whitespaces is True then check the source_value and
target_value if of type string. If the values is of type string
then using translate function ignore all the whitespaces.
:param ignore_whitespaces: flag to check ignore whitespace.
:param source_value: source schema diff value
:param target_value: target schema diff value
:return: return source and target values.
"""
if ignore_whitespaces:
if isinstance(source_value, str):
source_value = source_value.translate(
str.maketrans('', '', string.whitespace))
if isinstance(target_value, str):
target_value = target_value.translate(
str.maketrans('', '', string.whitespace))
return source_value, target_value
def directory_diff(source_dict, target_dict, ignore_keys=[], difference=None):
"""
This function is used to recursively compare two dictionaries and