Fixed cognitive complexity issues reported by SonarQube.

This commit is contained in:
Nikhil Mohite 2020-07-29 18:29:04 +05:30 committed by Akshay Joshi
parent 099fea15ae
commit 89fa85d650
8 changed files with 317 additions and 198 deletions

View File

@ -736,8 +736,7 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
status, res = self.conn.execute_2darray(SQL)
if not status:
return internal_server_error(errormsg=res)
if not res['rows']:
elif not res['rows']:
return make_json_response(
success=0,
errormsg=gettext(

View File

@ -63,6 +63,51 @@ def get_parent(conn, tid, template_path=None):
return schema, table
def _check_primary_column(data):
"""
To check if column is primary key
:param data: Data.
"""
if 'attnum' in data and 'indkey' in data:
# Current column
attnum = str(data['attnum'])
# Single/List of primary key column(s)
indkey = str(data['indkey'])
# We will check if column is in primary column(s)
if attnum in indkey.split(" "):
data['is_pk'] = True
data['is_primary_key'] = True
else:
data['is_pk'] = False
data['is_primary_key'] = False
def _fetch_inherited_tables(tid, data, fetch_inherited_tables, template_path,
conn):
"""
This function will check for fetch inherited tables, and return inherited
tables.
:param tid: Table Id.
:param data: Data.
:param fetch_inherited_tables: flag to fetch inherited tables.
:param template_path: Template path.
:param conn: Connection.
"""
if fetch_inherited_tables:
SQL = render_template("/".join(
[template_path, 'get_inherited_tables.sql']), tid=tid)
status, inh_res = conn.execute_dict(SQL)
if not status:
return True, internal_server_error(errormsg=inh_res)
for row in inh_res['rows']:
if row['attrname'] == data['name']:
data['is_inherited'] = True
data['tbls_inherited'] = row['inhrelname']
return False, ''
@get_template_path
def column_formatter(conn, tid, clid, data, edit_types_list=None,
fetch_inherited_tables=True, template_path=None):
@ -80,35 +125,17 @@ def column_formatter(conn, tid, clid, data, edit_types_list=None,
"""
# To check if column is primary key
if 'attnum' in data and 'indkey' in data:
# Current column
attnum = str(data['attnum'])
# Single/List of primary key column(s)
indkey = str(data['indkey'])
# We will check if column is in primary column(s)
if attnum in indkey.split(" "):
data['is_pk'] = True
data['is_primary_key'] = True
else:
data['is_pk'] = False
data['is_primary_key'] = False
_check_primary_column(data)
# Fetch length and precision
data = fetch_length_precision(data)
# We need to fetch inherited tables for each table
if fetch_inherited_tables:
SQL = render_template("/".join(
[template_path, 'get_inherited_tables.sql']), tid=tid)
status, inh_res = conn.execute_dict(SQL)
if not status:
return internal_server_error(errormsg=inh_res)
for row in inh_res['rows']:
if row['attrname'] == data['name']:
data['is_inherited'] = True
data['tbls_inherited'] = row['inhrelname']
is_error, errmsg = _fetch_inherited_tables(
tid, data, fetch_inherited_tables, template_path, conn)
if is_error:
return errmsg
# We need to format variables according to client js collection
if 'attoptions' in data and data['attoptions'] is not None:

View File

@ -583,8 +583,7 @@ class CompoundTriggerView(PGChildNodeView, SchemaDiffObjectCompare):
status, res = self.conn.execute_dict(SQL)
if not status:
return internal_server_error(errormsg=res)
if not res['rows']:
elif not res['rows']:
return make_json_response(
success=0,
errormsg=gettext(
@ -901,19 +900,19 @@ class CompoundTriggerView(PGChildNodeView, SchemaDiffObjectCompare):
sql = sql.strip('\n').strip(' ')
else:
if drop_sql:
SQL = self.delete(gid=gid, sid=sid, did=did,
sql = self.delete(gid=gid, sid=sid, did=did,
scid=scid, tid=tid,
trid=oid, only_sql=True)
else:
SQL = render_template("/".join([self.template_path,
sql = render_template("/".join([self.template_path,
self._PROPERTIES_SQL]),
tid=tid, trid=oid,
datlastsysoid=self.datlastsysoid)
status, res = self.conn.execute_dict(SQL)
status, res = self.conn.execute_dict(sql)
if not status:
return internal_server_error(errormsg=res)
if len(res['rows']) == 0:
elif len(res['rows']) == 0:
return gone(gettext("Could not find the compound "
"trigger in the table."))
@ -929,10 +928,22 @@ class CompoundTriggerView(PGChildNodeView, SchemaDiffObjectCompare):
data = trigger_definition(data)
sql = self._check_and_add_compound_trigger(tid, data,
diff_schema)
return sql
def _check_and_add_compound_trigger(self, tid, data, diff_schema):
"""
This get compound trigger and check for disable.
:param tid: Table Id.
:param data: Data.
:param diff_schema: schema diff check.
"""
if diff_schema:
data['schema'] = diff_schema
SQL, name = compound_trigger_utils.get_sql(self.conn,
sql, name = compound_trigger_utils.get_sql(self.conn,
data,
tid,
None,
@ -941,13 +952,12 @@ class CompoundTriggerView(PGChildNodeView, SchemaDiffObjectCompare):
# If compound trigger is disbaled then add sql
# code for the same
if not data['is_enable_trigger']:
SQL += '\n\n'
SQL += render_template("/".join([
sql += '\n\n'
sql += render_template("/".join([
self.template_path,
'enable_disable_trigger.sql']),
data=data, conn=self.conn)
return SQL
return sql
@check_precondition
def fetch_objects_to_compare(self, sid, did, scid, tid, oid=None):

View File

@ -106,8 +106,7 @@ def get_sql(conn, data, tid, trid, datlastsysoid, template_path=None):
status, res = conn.execute_dict(sql)
if not status:
raise Exception(res)
if len(res['rows']) == 0:
elif len(res['rows']) == 0:
raise ObjectGone(
_('Could not find the compound trigger in the table.'))

View File

@ -446,6 +446,56 @@ class CheckConstraintView(PGChildNodeView):
status=200
)
@staticmethod
def _get_req_data():
"""
Get all required data from request data attribute.
:return: Request data and Error if any.
"""
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
for k, v in data.items():
try:
# comments should be taken as is because if user enters a
# json comment it is parsed by loads which should not happen
if k in ('comment',):
data[k] = v
else:
data[k] = json.loads(v, encoding='utf-8')
except (ValueError, TypeError, KeyError):
data[k] = v
required_args = ['consrc']
for arg in required_args:
if arg not in data or data[arg] == '':
return True, make_json_response(
status=400,
success=0,
errormsg=_(
"Could not find the required parameter ({})."
).format(arg),
), data
return False, '', data
@staticmethod
def _check_valid_icon(res):
"""
Check and return icon value and is valid value.
:param res: Response data.
:return: icon value and valid flag.
"""
if "convalidated" in res['rows'][0] and res['rows'][0]["convalidated"]:
icon = "icon-check_constraint_bad"
valid = False
else:
icon = "icon-check_constraint"
valid = True
return icon, valid
@check_precondition
def create(self, gid, sid, did, scid, tid, cid=None):
"""
@ -462,32 +512,9 @@ class CheckConstraintView(PGChildNodeView):
Returns:
"""
required_args = ['consrc']
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
for k, v in data.items():
try:
# comments should be taken as is because if user enters a
# json comment it is parsed by loads which should not happen
if k in ('comment',):
data[k] = v
else:
data[k] = json.loads(v, encoding='utf-8')
except (ValueError, TypeError, KeyError):
data[k] = v
for arg in required_args:
if arg not in data or data[arg] == '':
return make_json_response(
status=400,
success=0,
errormsg=_(
"Could not find the required parameter ({})."
).format(arg)
)
is_error, errmsg, data = CheckConstraintView._get_req_data()
if is_error:
return errmsg
data['schema'] = self.schema
data['table'] = self.table
@ -497,20 +524,20 @@ class CheckConstraintView(PGChildNodeView):
try:
if 'name' not in data or data['name'] == "":
SQL = "BEGIN;"
sql = "BEGIN;"
# Start transaction.
status, res = self.conn.execute_scalar(SQL)
status, res = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=res)
# The below SQL will execute CREATE DDL only
SQL = render_template(
sql = render_template(
"/".join([self.template_path, self._CREATE_SQL]),
data=data
)
status, msg = self.conn.execute_scalar(SQL)
status, msg = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=msg)
@ -542,13 +569,7 @@ class CheckConstraintView(PGChildNodeView):
self.end_transaction()
return internal_server_error(errormsg=res)
if "convalidated" in res['rows'][0] and \
res['rows'][0]["convalidated"]:
icon = "icon-check_constraint_bad"
valid = False
else:
icon = "icon-check_constraint"
valid = True
icon, valid = CheckConstraintView._check_valid_icon(res)
return jsonify(
node=self.blueprint.generate_browser_node(

View File

@ -81,6 +81,29 @@ def get_check_constraints(conn, tid, cid=None, template_path=None):
return True, result['rows']
def _check_delete_constraint(constraint, data, template_path, conn, sql):
"""
This function if user deleted any constraint.
:param constraint: Constraint list in data.
:param data: Data.
:param template_path: sql template path for delete.
:param conn: connection.
:param sql: list for append delete constraint sql.
"""
if 'deleted' in constraint:
for c in constraint['deleted']:
c['schema'] = data['schema']
c['nspname'] = data['schema']
c['table'] = data['name']
# Sql for drop
sql.append(
render_template("/".join(
[template_path, 'delete.sql']),
data=c, conn=conn).strip("\n")
)
@get_template_path
def get_check_constraint_sql(conn, tid, data, template_path=None):
"""
@ -98,18 +121,7 @@ def get_check_constraint_sql(conn, tid, data, template_path=None):
if 'check_constraint' in data:
constraint = data['check_constraint']
# If constraint(s) is/are deleted
if 'deleted' in constraint:
for c in constraint['deleted']:
c['schema'] = data['schema']
c['nspname'] = data['schema']
c['table'] = data['name']
# Sql for drop
sql.append(
render_template("/".join(
[template_path, 'delete.sql']),
data=c, conn=conn).strip("\n")
)
_check_delete_constraint(constraint, data, template_path, conn, sql)
if 'changed' in constraint:
for c in constraint['changed']:

View File

@ -479,6 +479,45 @@ class ForeignKeyConstraintView(PGChildNodeView):
))
return res
@staticmethod
def _get_reqes_data():
"""
Get data from request.
return: Data.
"""
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
for k, v in data.items():
try:
# comments should be taken as is because if user enters a
# json comment it is parsed by loads which should not happen
if k in ('comment',):
data[k] = v
else:
data[k] = json.loads(v, encoding='utf-8')
except (ValueError, TypeError, KeyError):
data[k] = v
return data
@staticmethod
def _check_for_req_data(data):
required_args = ['columns']
for arg in required_args:
if arg not in data or \
(isinstance(data[arg], list) and len(data[arg]) < 1):
return True, make_json_response(
status=400,
success=0,
errormsg=gettext(
"Could not find required parameter ({})."
).format(arg)
)
return False, ''
@check_precondition
def create(self, gid, sid, did, scid, tid, fkid=None):
"""
@ -495,33 +534,12 @@ class ForeignKeyConstraintView(PGChildNodeView):
Returns:
"""
required_args = ['columns']
data = ForeignKeyConstraintView._get_reqes_data()
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
for k, v in data.items():
try:
# comments should be taken as is because if user enters a
# json comment it is parsed by loads which should not happen
if k in ('comment',):
data[k] = v
else:
data[k] = json.loads(v, encoding='utf-8')
except (ValueError, TypeError, KeyError):
data[k] = v
for arg in required_args:
if arg not in data or \
(isinstance(data[arg], list) and len(data[arg]) < 1):
return make_json_response(
status=400,
success=0,
errormsg=gettext(
"Could not find required parameter ({})."
).format(arg)
)
is_arg_error, errmsg = ForeignKeyConstraintView._check_for_req_data(
data)
if is_arg_error:
return errmsg
data['schema'] = self.schema
data['table'] = self.table
@ -533,25 +551,25 @@ class ForeignKeyConstraintView(PGChildNodeView):
data['remote_table'] = table
if 'name' not in data or data['name'] == "":
SQL = render_template(
sql = render_template(
"/".join([self.template_path, 'begin.sql']))
# Start transaction.
status, res = self.conn.execute_scalar(SQL)
status, res = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=res)
# The below SQL will execute CREATE DDL only
SQL = render_template(
sql = render_template(
"/".join([self.template_path, self._CREATE_SQL]),
data=data, conn=self.conn
)
status, res = self.conn.execute_scalar(SQL)
status, res = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=res)
if 'name' not in data or data['name'] == "":
elif 'name' not in data or data['name'] == "":
sql = render_template(
"/".join([self.template_path,
'get_oid_with_transaction.sql']),
@ -576,24 +594,9 @@ class ForeignKeyConstraintView(PGChildNodeView):
self.end_transaction()
return internal_server_error(errormsg=res)
if res['rows'][0]["convalidated"]:
icon = "icon-foreign_key_no_validate"
valid = False
else:
icon = "icon-foreign_key"
valid = True
if data['autoindex']:
sql = render_template(
"/".join([self.template_path, 'create_index.sql']),
data=data, conn=self.conn)
sql = sql.strip('\n').strip(' ')
if sql != '':
status, idx_res = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=idx_res)
is_error, errmsg, icon, valid = self._create_index(data, res)
if is_error:
return errmsg
return jsonify(
node=self.blueprint.generate_browser_node(
@ -613,6 +616,36 @@ class ForeignKeyConstraintView(PGChildNodeView):
errormsg=e
)
def _create_index(self, data, res):
"""
Create index for foreign key.
data: Data.
res: Response form transaction.
Return: if error in create index return error, else return icon
and valid status
"""
if res['rows'][0]["convalidated"]:
icon = "icon-foreign_key_no_validate"
valid = False
else:
icon = "icon-foreign_key"
valid = True
if data['autoindex']:
sql = render_template(
"/".join([self.template_path, 'create_index.sql']),
data=data, conn=self.conn)
sql = sql.strip('\n').strip(' ')
if sql != '':
status, idx_res = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return True, internal_server_error(
errormsg=idx_res), icon, valid
return False, '', icon, valid
@check_precondition
def update(self, gid, sid, did, scid, tid, fkid=None):
"""

View File

@ -490,6 +490,64 @@ class IndexConstraintView(PGChildNodeView):
))
return res
@staticmethod
def _get_req_data():
"""
Get data from request.
return: data.
"""
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
for k, v in data.items():
try:
# comments should be taken as is because if user enters a
# json comment it is parsed by loads which should not happen
if k in ('comment',):
data[k] = v
else:
data[k] = json.loads(v, encoding='utf-8')
except (ValueError, TypeError, KeyError):
data[k] = v
return data
@staticmethod
def _check_required_args(data):
required_args = [
[u'columns', u'index'] # Either of one should be there.
]
def is_key_list(key, data):
return isinstance(data[key], list) and len(data[param]) > 0
for arg in required_args:
if isinstance(arg, list):
for param in arg:
if param in data and (param != 'columns' or
is_key_list(param, data)):
break
else:
return True, make_json_response(
status=400,
success=0,
errormsg=_(
"Could not find at least one required "
"parameter ({}).".format(str(param)))
)
elif arg not in data:
return True, make_json_response(
status=400,
success=0,
errormsg=_(
"Could not find the required parameter ({})."
).format(arg)
)
return False, ''
@check_precondition
def create(self, gid, sid, did, scid, tid, cid=None):
"""
@ -506,76 +564,36 @@ class IndexConstraintView(PGChildNodeView):
Returns:
"""
required_args = [
[u'columns', u'index'] # Either of one should be there.
]
data = IndexConstraintView._get_req_data()
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
is_error, errmsg = IndexConstraintView._check_required_args(data)
for k, v in data.items():
try:
# comments should be taken as is because if user enters a
# json comment it is parsed by loads which should not happen
if k in ('comment',):
data[k] = v
else:
data[k] = json.loads(v, encoding='utf-8')
except (ValueError, TypeError, KeyError):
data[k] = v
def is_key_list(key, data):
return isinstance(data[key], list) and len(data[param]) > 0
for arg in required_args:
if isinstance(arg, list):
for param in arg:
if param in data and (param != 'columns' or
is_key_list(param, data)):
break
else:
return make_json_response(
status=400,
success=0,
errormsg=_(
"Could not find at least one required "
"parameter ({}).".format(str(param)))
)
elif arg not in data:
return make_json_response(
status=400,
success=0,
errormsg=_(
"Could not find the required parameter ({})."
).format(arg)
)
if is_error:
return errmsg
data['schema'] = self.schema
data['table'] = self.table
try:
if 'name' not in data or data['name'] == "":
SQL = render_template(
sql = render_template(
"/".join([self.template_path, 'begin.sql']))
# Start transaction.
status, res = self.conn.execute_scalar(SQL)
status, res = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=res)
# The below SQL will execute CREATE DDL only
SQL = render_template(
sql = render_template(
"/".join([self.template_path, self._CREATE_SQL]),
data=data, conn=self.conn,
constraint_name=self.constraint_name
)
status, msg = self.conn.execute_scalar(SQL)
status, msg = self.conn.execute_scalar(sql)
if not status:
self.end_transaction()
return internal_server_error(errormsg=msg)
if 'name' not in data or data['name'] == "":
elif 'name' not in data or data['name'] == "":
sql = render_template(
"/".join([self.template_path,
'get_oid_with_transaction.sql'],