Fixed cognitive complexity issues reported by SonarQube.

This commit is contained in:
Nikhil Mohite
2020-08-10 16:30:07 +05:30
committed by Akshay Joshi
parent 7a0bfecfc5
commit 9d006d0ec5
4 changed files with 246 additions and 141 deletions

View File

@@ -663,13 +663,6 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
return internal_server_error(errormsg=res) return internal_server_error(errormsg=res)
for row in rset['rows']: for row in rset['rows']:
# Attaching properties for precession
# & length validation for current type
precision = False
length = False
min_val = 0
max_val = 0
# Check against PGOID for specific type # Check against PGOID for specific type
if row['elemoid']: if row['elemoid']:
if row['elemoid'] in (1560, 1561, 1562, 1563, 1042, 1043, if row['elemoid'] in (1560, 1561, 1562, 1563, 1042, 1043,
@@ -684,19 +677,8 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
typeval = ' ' typeval = ' '
# Logic to set precision & length/min/max values # Logic to set precision & length/min/max values
if typeval == 'P': precision, length, min_val,\
precision = True max_val = TypeView.set_precision_and_len_val(typeval)
if precision or typeval in ('L', 'D'):
length = True
min_val = 0 if typeval == 'D' else 1
if precision:
max_val = 1000
elif min_val:
# Max of integer value
max_val = 2147483647
else:
max_val = 10
res.append( res.append(
{'label': row['typname'], 'value': row['typname'], {'label': row['typname'], 'value': row['typname'],
@@ -713,6 +695,35 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
except Exception as e: except Exception as e:
return internal_server_error(errormsg=str(e)) return internal_server_error(errormsg=str(e))
@staticmethod
def set_precision_and_len_val(typeval):
"""
Logic to set precision & length/min/max values
:param typeval: type value to check precision, Length.
:return: precision, length, min val and max val.
"""
# Attaching properties for precession
# & length validation for current type
precision = False
length = False
min_val = 0
max_val = 0
if typeval == 'P':
precision = True
if precision or typeval in ('L', 'D'):
length = True
min_val = 0 if typeval == 'D' else 1
if precision:
max_val = 1000
elif min_val:
# Max of integer value
max_val = 2147483647
else:
max_val = 10
return precision, length, min_val, max_val
@check_precondition @check_precondition
def get_subtypes(self, gid, sid, did, scid, tid=None): def get_subtypes(self, gid, sid, did, scid, tid=None):
""" """
@@ -866,11 +877,11 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
try: try:
# The SQL generated below will populate Input/Output/Send/ # The SQL generated below will populate Input/Output/Send/
# Receive/Analyze/TypModeIN/TypModOUT combo box # Receive/Analyze/TypModeIN/TypModOUT combo box
SQL = render_template("/".join([self.template_path, sql = render_template("/".join([self.template_path,
'get_external_functions.sql']), 'get_external_functions.sql']),
extfunc=True) extfunc=True)
if SQL: if sql:
status, rset = self.conn.execute_2darray(SQL) status, rset = self.conn.execute_2darray(sql)
if not status: if not status:
return internal_server_error(errormsg=res) return internal_server_error(errormsg=res)
@@ -880,11 +891,11 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
'cbtype': 'all'}) 'cbtype': 'all'})
# The SQL generated below will populate TypModeIN combo box # The SQL generated below will populate TypModeIN combo box
SQL = render_template("/".join([self.template_path, sql = render_template("/".join([self.template_path,
'get_external_functions.sql']), 'get_external_functions.sql']),
typemodin=True) typemodin=True)
if SQL: if sql:
status, rset = self.conn.execute_2darray(SQL) status, rset = self.conn.execute_2darray(sql)
if not status: if not status:
return internal_server_error(errormsg=res) return internal_server_error(errormsg=res)
@@ -894,18 +905,7 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
'cbtype': 'typmodin'}) 'cbtype': 'typmodin'})
# The SQL generated below will populate TypModeIN combo box # The SQL generated below will populate TypModeIN combo box
SQL = render_template("/".join([self.template_path, self._get_data_for_type_modein(res)
'get_external_functions.sql']),
typemodout=True)
if SQL:
status, rset = self.conn.execute_2darray(SQL)
if not status:
return internal_server_error(errormsg=res)
for row in rset['rows']:
res.append(
{'label': row['func'], 'value': row['func'],
'cbtype': 'typmodout'})
return make_json_response( return make_json_response(
data=res, data=res,
@@ -915,6 +915,68 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
except Exception as e: except Exception as e:
return internal_server_error(errormsg=str(e)) return internal_server_error(errormsg=str(e))
def _get_data_for_type_modein(self, res):
"""
Data for TypModeIN combo box
:param res: response object.
:return:
"""
sql = render_template("/".join([self.template_path,
'get_external_functions.sql']),
typemodout=True)
if sql:
status, rset = self.conn.execute_2darray(sql)
if not status:
return internal_server_error(errormsg=res)
for row in rset['rows']:
res.append(
{'label': row['func'], 'value': row['func'],
'cbtype': 'typmodout'})
@staticmethod
def _checks_for_create_type(data):
required_args = {
'name': 'Name',
'typtype': 'Type'
}
for arg in required_args:
if arg not in data:
return True, make_json_response(
status=410,
success=0,
errormsg=gettext(
"Could not find the required parameter ({})."
).format(arg)
)
# Additional checks goes here
# If type is range then check if subtype is defined or not
if data and data[arg] == 'r' and \
('typname' not in data or data['typname'] is None):
return True, make_json_response(
status=410,
success=0,
errormsg=gettext(
'Subtype must be defined for range types.'
)
)
# If type is external then check if input/output
# conversion function is defined
if data and data[arg] == 'b' and (
'typinput' not in data or
'typoutput' not in data or
data['typinput'] is None or
data['typoutput'] is None):
return True, make_json_response(
status=410,
success=0,
errormsg=gettext(
'External types require both input and output '
'conversion functions.'
)
)
return False, ''
@check_precondition @check_precondition
def create(self, gid, sid, did, scid): def create(self, gid, sid, did, scid):
""" """
@@ -930,46 +992,10 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
data = request.form if request.form else json.loads( data = request.form if request.form else json.loads(
request.data, encoding='utf-8' request.data, encoding='utf-8'
) )
required_args = {
'name': 'Name',
'typtype': 'Type'
}
for arg in required_args: is_error, errmsg = TypeView._checks_for_create_type(data)
if arg not in data: if is_error:
return make_json_response( return errmsg
status=410,
success=0,
errormsg=gettext(
"Could not find the required parameter ({})."
).format(arg)
)
# Additional checks goes here
# If type is range then check if subtype is defined or not
if data and data[arg] == 'r' and \
('typname' not in data or data['typname'] is None):
return make_json_response(
status=410,
success=0,
errormsg=gettext(
'Subtype must be defined for range types.'
)
)
# If type is external then check if input/output
# conversion function is defined
if data and data[arg] == 'b' and (
'typinput' not in data or
'typoutput' not in data or
data['typinput'] is None or
data['typoutput'] is None):
return make_json_response(
status=410,
success=0,
errormsg=gettext(
'External types require both input and output '
'conversion functions.'
)
)
# To format privileges coming from client # To format privileges coming from client
if 'typacl' in data and data['typacl'] is not None: if 'typacl' in data and data['typacl'] is not None:

View File

@@ -113,35 +113,7 @@ def preferences(module=None, preference=None):
def label(p): def label(p):
return gettext(p['label']) return gettext(p['label'])
for m in pref: _group_pref_by_categories(pref, res, label)
if len(m['categories']):
om = {
"id": m['id'],
"label": gettext(m['label']),
"inode": True,
"open": True,
"branch": []
}
for c in m['categories']:
for p in c['preferences']:
if 'label' in p and p['label'] is not None:
p['label'] = gettext(p['label'])
if 'help_str' in p and p['help_str'] is not None:
p['help_str'] = gettext(p['help_str'])
oc = {
"id": c['id'],
"mid": m['id'],
"label": gettext(c['label']),
"inode": False,
"open": False,
"preferences": sorted(c['preferences'], key=label)
}
(om['branch']).append(oc)
om['branch'] = sorted(om['branch'], key=label)
res.append(om)
return ajax_response( return ajax_response(
response=sorted(res, key=label), response=sorted(res, key=label),
@@ -149,6 +121,56 @@ def preferences(module=None, preference=None):
) )
def _group_pref_by_categories(pref, res, label):
"""
Group preference by categories type.
:param pref: preference data.
:param res: response for request.
:param label: get label.
:return:
"""
for pref_d in pref:
if len(pref_d['categories']):
_iterate_categories(pref_d, label, res)
def _iterate_categories(pref_d, label, res):
"""
Iterate preference categories.
:param pref_d: preference data
:param label: get label.
:param res: response.
:return:
"""
om = {
"id": pref_d['id'],
"label": gettext(pref_d['label']),
"inode": True,
"open": True,
"branch": []
}
for c in pref_d['categories']:
for p in c['preferences']:
if 'label' in p and p['label'] is not None:
p['label'] = gettext(p['label'])
if 'help_str' in p and p['help_str'] is not None:
p['help_str'] = gettext(p['help_str'])
oc = {
"id": c['id'],
"mid": pref_d['id'],
"label": gettext(c['label']),
"inode": False,
"open": False,
"preferences": sorted(c['preferences'], key=label)
}
(om['branch']).append(oc)
om['branch'] = sorted(om['branch'], key=label)
res.append(om)
@blueprint.route("/get_all", methods=["GET"], endpoint='get_all') @blueprint.route("/get_all", methods=["GET"], endpoint='get_all')
@login_required @login_required
def preferences_s(): def preferences_s():

View File

@@ -200,6 +200,54 @@ def filename_with_file_manager_path(_file, _present=False):
return fs_short_path(_file) return fs_short_path(_file)
def _get_ignored_column_list(data, driver, conn):
"""
Get list of ignored columns for import/export.
:param data: Data.
:param driver: PG Driver.
:param conn: Connection.
:return: return ignored column list.
"""
icols = None
if data['icolumns']:
ignore_cols = data['icolumns']
# format the ignore column list required as per copy command
# requirement
if ignore_cols and len(ignore_cols) > 0:
icols = ", ".join([
driver.qtIdent(conn, col)
for col in ignore_cols])
return icols
def _get_required_column_list(data, driver, conn):
"""
Get list of required columns for import/export.
:param data: Data.
:param driver: PG Driver.
:param conn: Connection.
:return: return required column list.
"""
cols = None
# format the column import/export list required as per copy command
# requirement
if data['columns']:
columns = data['columns']
if columns and len(columns) > 0:
for col in columns:
if cols:
cols += ', '
else:
cols = '('
cols += driver.qtIdent(conn, col)
cols += ')'
return cols
@blueprint.route('/job/<int:sid>', methods=['POST'], endpoint="create_job") @blueprint.route('/job/<int:sid>', methods=['POST'], endpoint="create_job")
@login_required @login_required
def create_import_export_job(sid): def create_import_export_job(sid):
@@ -263,31 +311,9 @@ def create_import_export_job(sid):
else: else:
return bad_request(errormsg=_('Please specify a valid file')) return bad_request(errormsg=_('Please specify a valid file'))
cols = None # Get required and ignored column list
icols = None icols = _get_ignored_column_list(data, driver, conn)
cols = _get_required_column_list(data, driver, conn)
if data['icolumns']:
ignore_cols = data['icolumns']
# format the ignore column list required as per copy command
# requirement
if ignore_cols and len(ignore_cols) > 0:
icols = ", ".join([
driver.qtIdent(conn, col)
for col in ignore_cols])
# format the column import/export list required as per copy command
# requirement
if data['columns']:
columns = data['columns']
if columns and len(columns) > 0:
for col in columns:
if cols:
cols += ', '
else:
cols = '('
cols += driver.qtIdent(conn, col)
cols += ')'
# Create the COPY FROM/TO from template # Create the COPY FROM/TO from template
query = render_template( query = render_template(

View File

@@ -581,16 +581,47 @@ def compare_list_by_ignoring_keys(source_list, target_list, added, updated,
target_with_ignored_keys = copy.deepcopy(tmp_target) target_with_ignored_keys = copy.deepcopy(tmp_target)
# Remove ignore keys from source and target before comparison # Remove ignore keys from source and target before comparison
for ig_key in ignore_keys: _remove_keys(ignore_keys, source_with_ignored_keys,
if ig_key in source_with_ignored_keys: target_with_ignored_keys)
del source_with_ignored_keys[ig_key]
if ig_key in target_with_ignored_keys:
del target_with_ignored_keys[ig_key]
if source_with_ignored_keys != target_with_ignored_keys: _compare_source_and_target(source_with_ignored_keys,
updated.append(source_list) target_with_ignored_keys, source_list,
target_list.remove(tmp_target) target_list, updated, tmp_target)
elif source_with_ignored_keys == target_with_ignored_keys:
target_list.remove(tmp_target)
else: else:
added.append(source_list) added.append(source_list)
def _remove_keys(ignore_keys, source_with_ignored_keys,
target_with_ignored_keys):
"""
Remove non required keys form both source and target object.
:param ignore_keys: ignore keys list.
:param source_with_ignored_keys: source keys list.
:param target_with_ignored_keys: target keys list.
:return: None
"""
for ig_key in ignore_keys:
if ig_key in source_with_ignored_keys:
del source_with_ignored_keys[ig_key]
if ig_key in target_with_ignored_keys:
del target_with_ignored_keys[ig_key]
def _compare_source_and_target(source_with_ignored_keys,
target_with_ignored_keys, source_list,
target_list, updated, tmp_target):
"""
Compare source and target keys
:param source_with_ignored_keys:
:param target_with_ignored_keys:
:param source_list:
:param target_list:
:param updated:
:param tmp_target:
:return:
"""
if source_with_ignored_keys != target_with_ignored_keys:
updated.append(source_list)
target_list.remove(tmp_target)
elif source_with_ignored_keys == target_with_ignored_keys:
target_list.remove(tmp_target)