Fixed cognitive complexity issues reported by SonarQube.

This commit is contained in:
Pradip Parkale 2020-08-25 12:28:55 +05:30 committed by Akshay Joshi
parent e6bd085c15
commit 7f947f146c
28 changed files with 217 additions and 220 deletions

View File

@ -542,6 +542,86 @@ def _get_logout_url():
url_for('security.logout'), url_for(BROWSER_INDEX))
def _get_supported_browser():
"""
This function return supported browser.
:return: browser name, browser known, browser version
"""
browser = request.user_agent.browser
version = request.user_agent.version and int(
request.user_agent.version.split('.')[0])
browser_name = None
browser_known = True
if browser == 'chrome' and version < 72:
browser_name = 'Chrome'
elif browser == 'firefox' and version < 65:
browser_name = 'Firefox'
# comparing EdgeHTML engine version
elif browser == 'edge' and version < 18:
browser_name = 'Edge'
# browser version returned by edge browser is actual EdgeHTML
# engine version. Below code gets actual browser version using
# EdgeHTML version
engine_to_actual_browser_version = {
16: 41,
17: 42,
18: 44
}
version = engine_to_actual_browser_version.get(version, '< 44')
elif browser == 'safari' and version < 12:
browser_name = 'Safari'
elif browser == 'msie':
browser_name = 'Internet Explorer'
elif browser != 'chrome' and browser != 'firefox' and \
browser != 'edge' and browser != 'safari':
browser_name = browser
browser_known = False
return browser_name, browser_known, version
def check_browser_upgrade():
"""
This function is used to check the browser version.
:return:
"""
data = None
url = '%s?version=%s' % (config.UPGRADE_CHECK_URL, config.APP_VERSION)
current_app.logger.debug('Checking version data at: %s' % url)
try:
# Do not wait for more than 5 seconds.
# It stuck on rendering the browser.html, while working in the
# broken network.
if os.path.exists(config.CA_FILE):
response = urlopen(url, data, 5, cafile=config.CA_FILE)
else:
response = urlopen(url, data, 5)
current_app.logger.debug(
'Version check HTTP response code: %d' % response.getcode()
)
if response.getcode() == 200:
data = json.loads(response.read().decode('utf-8'))
current_app.logger.debug('Response data: %s' % data)
except Exception:
current_app.logger.exception('Exception when checking for update')
if data is not None and \
data[config.UPGRADE_CHECK_KEY]['version_int'] > \
config.APP_VERSION_INT:
msg = render_template(
MODULE_NAME + "/upgrade.html",
current_version=config.APP_VERSION,
upgrade_version=data[config.UPGRADE_CHECK_KEY]['version'],
product_name=config.APP_NAME,
download_url=data[config.UPGRADE_CHECK_KEY]['download_url']
)
flash(msg, 'warning')
@blueprint.route("/")
@pgCSRFProtect.exempt
@login_required
@ -563,36 +643,7 @@ def index():
# NOTE: If the checks here are updated, make sure the supported versions
# at https://www.pgadmin.org/faq/#11 are updated to match!
if config.CHECK_SUPPORTED_BROWSER:
browser = request.user_agent.browser
version = request.user_agent.version and int(
request.user_agent.version.split('.')[0])
browser_name = None
browser_known = True
if browser == 'chrome' and version < 72:
browser_name = 'Chrome'
elif browser == 'firefox' and version < 65:
browser_name = 'Firefox'
# comparing EdgeHTML engine version
elif browser == 'edge' and version < 18:
browser_name = 'Edge'
# browser version returned by edge browser is actual EdgeHTML
# engine version. Below code gets actual browser version using
# EdgeHTML version
engine_to_actual_browser_version = {
16: 41,
17: 42,
18: 44
}
version = engine_to_actual_browser_version.get(version, '< 44')
elif browser == 'safari' and version < 12:
browser_name = 'Safari'
elif browser == 'msie':
browser_name = 'Internet Explorer'
elif browser != 'chrome' and browser != 'firefox' and \
browser != 'edge' and browser != 'safari':
browser_name = browser
browser_known = False
browser_name, browser_known, version = _get_supported_browser()
if browser_name is not None:
msg = render_template(
@ -607,40 +658,7 @@ def index():
# Get the current version info from the website, and flash a message if
# the user is out of date, and the check is enabled.
if config.UPGRADE_CHECK_ENABLED:
data = None
url = '%s?version=%s' % (config.UPGRADE_CHECK_URL, config.APP_VERSION)
current_app.logger.debug('Checking version data at: %s' % url)
try:
# Do not wait for more than 5 seconds.
# It stuck on rendering the browser.html, while working in the
# broken network.
if os.path.exists(config.CA_FILE):
response = urlopen(url, data, 5, cafile=config.CA_FILE)
else:
response = urlopen(url, data, 5)
current_app.logger.debug(
'Version check HTTP response code: %d' % response.getcode()
)
if response.getcode() == 200:
data = json.loads(response.read().decode('utf-8'))
current_app.logger.debug('Response data: %s' % data)
except Exception:
current_app.logger.exception('Exception when checking for update')
if data is not None and \
data[config.UPGRADE_CHECK_KEY]['version_int'] > \
config.APP_VERSION_INT:
msg = render_template(
MODULE_NAME + "/upgrade.html",
current_version=config.APP_VERSION,
upgrade_version=data[config.UPGRADE_CHECK_KEY]['version'],
product_name=config.APP_NAME,
download_url=data[config.UPGRADE_CHECK_KEY]['download_url']
)
flash(msg, 'warning')
check_browser_upgrade()
auth_only_internal = False
auth_source = []

View File

@ -342,7 +342,7 @@ class ExtensionView(PGChildNodeView, SchemaDiffObjectCompare):
else:
data = {'ids': [eid]}
cascade = True if self.cmd == 'delete' else False
cascade = self._check_cascade_operation()
try:
for eid in data['ids']:

View File

@ -530,6 +530,32 @@ class UserMappingView(PGChildNodeView, SchemaDiffObjectCompare):
return cascade, data
def _fetch_specified_user_mapping_properties(self, umid):
"""
This function is used to fetch the specified user mapping.
:param umid:
:return:
"""
try:
sql = render_template("/".join([self.template_path,
'properties.sql']),
umid=umid, conn=self.conn)
status, res = self.conn.execute_dict(sql)
if not status:
return internal_server_error(errormsg=res)
if not res['rows']:
return make_json_response(
status=410,
success=0,
errormsg=gettext(
'The specified user mapping could not be found.\n'
)
)
return res
except Exception as e:
return internal_server_error(errormsg=str(e))
@check_precondition
def delete(self, gid, sid, did, fid, fsid, **kwargs):
"""
@ -573,22 +599,7 @@ class UserMappingView(PGChildNodeView, SchemaDiffObjectCompare):
'could not be found.\n'
)
)
sql = render_template("/".join([self.template_path,
self._PROPERTIES_SQL]),
umid=umid, conn=self.conn)
status, res = self.conn.execute_dict(sql)
if not status:
return internal_server_error(errormsg=res)
if not res['rows']:
return make_json_response(
status=410,
success=0,
errormsg=gettext(
'The specified user mapping could not be found.\n'
)
)
res = self._fetch_specified_user_mapping_properties(umid)
data = res['rows'][0]

View File

@ -531,11 +531,7 @@ class LanguageView(PGChildNodeView, SchemaDiffObjectCompare):
else:
data = {'ids': [lid]}
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for lid in data['ids']:

View File

@ -731,7 +731,7 @@ It may have been removed by another user.
"/".join([self.template_path,
self._SQL_PREFIX + self._DELETE_SQL]),
_=gettext, name=name, conn=self.conn,
cascade=True if self.cmd == 'delete' else False
cascade=self._check_cascade_operation()
)
status, res = self.conn.execute_scalar(SQL)
if not status:

View File

@ -524,7 +524,8 @@ class CollationView(PGChildNodeView, SchemaDiffObjectCompare):
else {'ids': [coid]}
# Below will decide if it's simple drop or drop with cascade call
cascade = True if self.cmd == 'delete' else False
cascade = self._check_cascade_operation()
try:
for coid in data['ids']:

View File

@ -627,11 +627,7 @@ AND relkind != 'c'))"""
else:
data = {'ids': [doid]}
if self.cmd == 'delete' or only_sql:
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation(only_sql)
for doid in data['ids']:
SQL = render_template("/".join([self.template_path,
@ -807,6 +803,26 @@ AND relkind != 'c'))"""
except Exception as e:
return internal_server_error(errormsg=str(e))
def check_domain_type(self, data, old_data, is_schema_diff):
"""
Check domain type
:return:
"""
# If fulltype or basetype or collname is changed while comparing
# two schemas then we need to drop domain and recreate it
if 'fulltype' in data or 'basetype' in data or 'collname' in data:
SQL = render_template(
"/".join([self.template_path, 'domain_schema_diff.sql']),
data=data, o_data=old_data)
else:
if is_schema_diff:
data['is_schema_diff'] = True
SQL = render_template(
"/".join([self.template_path, 'update.sql']),
data=data, o_data=old_data)
return SQL, data
def get_sql(self, gid, sid, data, scid, doid=None, is_schema_diff=False):
"""
Generates the SQL statements to create/update the Domain.
@ -847,19 +863,7 @@ AND relkind != 'c'))"""
old_data['constraints'] = con_data
# If fulltype or basetype or collname is changed while comparing
# two schemas then we need to drop domain and recreate it
if 'fulltype' in data or 'basetype' in data or 'collname' in data:
SQL = render_template(
"/".join([self.template_path, 'domain_schema_diff.sql']),
data=data, o_data=old_data)
else:
if is_schema_diff:
data['is_schema_diff'] = True
SQL = render_template(
"/".join([self.template_path, self._UPDATE_SQL]),
data=data, o_data=old_data)
SQL, data = self.check_domain_type(data, old_data, is_schema_diff)
return SQL.strip('\n'), data['name'] if 'name' in data else \
old_data['name']
else:

View File

@ -722,11 +722,7 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
else:
data = {'ids': [foid]}
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for foid in data['ids']:

View File

@ -554,11 +554,7 @@ class FtsConfigurationView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [cfgid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for cfgid in data['ids']:

View File

@ -551,11 +551,8 @@ class FtsDictionaryView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [dcid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for dcid in data['ids']:

View File

@ -497,11 +497,7 @@ class FtsParserView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [pid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for pid in data['ids']:

View File

@ -462,11 +462,7 @@ class FtsTemplateView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [tid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
for tid in data['ids']:
# Get name for template from tid

View File

@ -895,11 +895,7 @@ class FunctionView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
else:
data = {'ids': [fnid]}
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for fnid in data['ids']:

View File

@ -438,11 +438,8 @@ class PackageView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [pkgid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for pkgid in data['ids']:

View File

@ -465,11 +465,8 @@ class SequenceView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [seid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for seid in data['ids']:

View File

@ -562,11 +562,7 @@ class CompoundTriggerView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [trid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for trid in data['ids']:

View File

@ -669,11 +669,7 @@ class ExclusionConstraintView(PGChildNodeView):
data = {'ids': [exid]}
# Below code will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for exid in data['ids']:
sql = render_template(

View File

@ -731,11 +731,8 @@ class ForeignKeyConstraintView(PGChildNodeView):
data = {'ids': [fkid]}
# Below code will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for fkid in data['ids']:
sql = render_template(

View File

@ -718,11 +718,7 @@ class IndexConstraintView(PGChildNodeView):
data = {'ids': [cid]}
# Below code will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for cid in data['ids']:
sql = render_template(

View File

@ -681,11 +681,8 @@ class IndexesView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [idx]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for idx in data['ids']:

View File

@ -436,6 +436,22 @@ class RowSecurityView(PGChildNodeView):
except Exception as e:
return internal_server_error(errormsg=str(e))
@staticmethod
def get_policy_data(plid):
"""
return policy data
:param plid:
:return: policy id
"""
if plid is None:
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
else:
data = {'ids': [plid]}
return data
@check_precondition
def delete(self, gid, sid, did, scid, tid, **kwargs):
"""
@ -453,18 +469,9 @@ class RowSecurityView(PGChildNodeView):
only_sql = kwargs.get('only_sql', False)
# Below will deplide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
if plid is None:
data = request.form if request.form else json.loads(
request.data, encoding='utf-8'
)
else:
data = {'ids': [plid]}
data = self.get_policy_data(plid)
for plid in data['ids']:
try:

View File

@ -407,7 +407,8 @@ class RuleView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [rid]}
# Below will decide if it's simple drop or drop with cascade call
cascade = True if self.cmd == 'delete' else False
cascade = self._check_cascade_operation()
try:
for rid in data['ids']:

View File

@ -596,11 +596,8 @@ class TriggerView(PGChildNodeView, SchemaDiffObjectCompare):
data = {'ids': [trid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
try:
for trid in data['ids']:

View File

@ -1684,11 +1684,7 @@ class BaseTableView(PGChildNodeView, BasePartitionTable):
def get_delete_sql(self, res):
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete':
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
data = res['rows'][0]

View File

@ -1106,12 +1106,7 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
else:
data = {'ids': [tid]}
# Below will decide if it's simple drop or drop with cascade call
if self.cmd == 'delete' or only_sql:
# This is a cascade operation
cascade = True
else:
cascade = False
cascade = self._check_cascade_operation()
return data, cascade

View File

@ -647,7 +647,8 @@ class ViewNode(PGChildNodeView, VacuumSettings, SchemaDiffObjectCompare):
data = {'ids': [vid]}
# Below will decide if it's simple drop or drop with cascade call
cascade = True if self.cmd == 'delete' else False
cascade = self._check_cascade_operation()
try:
for vid in data['ids']:

View File

@ -711,6 +711,19 @@ class PGChildNodeView(NodeView):
return dependency
def _check_cascade_operation(self, only_sql=None):
"""
Check cascade operation.
:param only_sql:
:return:
"""
if self.cmd == 'delete' or only_sql:
# This is a cascade operation
cascade = True
else:
cascade = False
return cascade
def not_found_error_msg(self, custom_label=None):
return gettext("Could not find the specified {}.".format(
custom_label if custom_label else self.node_label).lower())

View File

@ -2132,10 +2132,31 @@ def poll_result(trans_id):
)
def release_connection(manager, dbg_obj):
"""This function is used to release connection."""
conn = manager.connection(
did=dbg_obj['database_id'],
conn_id=dbg_obj['conn_id'])
if conn.connected():
conn.cancel_transaction(
dbg_obj['conn_id'],
dbg_obj['database_id'])
manager.release(conn_id=dbg_obj['conn_id'])
if 'exe_conn_id' in dbg_obj:
conn = manager.connection(
did=dbg_obj['database_id'],
conn_id=dbg_obj['exe_conn_id'])
if conn.connected():
conn.cancel_transaction(
dbg_obj['exe_conn_id'],
dbg_obj['database_id'])
manager.release(conn_id=dbg_obj['exe_conn_id'])
def close_debugger_session(_trans_id, close_all=False):
"""
This function is used to cancel the debugger transaction and
release the connection.
This function is used to cancel the debugger transaction.
:param trans_id: Transaction id
:return:
@ -2156,24 +2177,7 @@ def close_debugger_session(_trans_id, close_all=False):
connection_manager(dbg_obj['server_id'])
if manager is not None:
conn = manager.connection(
did=dbg_obj['database_id'],
conn_id=dbg_obj['conn_id'])
if conn.connected():
conn.cancel_transaction(
dbg_obj['conn_id'],
dbg_obj['database_id'])
manager.release(conn_id=dbg_obj['conn_id'])
if 'exe_conn_id' in dbg_obj:
conn = manager.connection(
did=dbg_obj['database_id'],
conn_id=dbg_obj['exe_conn_id'])
if conn.connected():
conn.cancel_transaction(
dbg_obj['exe_conn_id'],
dbg_obj['database_id'])
manager.release(conn_id=dbg_obj['exe_conn_id'])
release_connection(manager, dbg_obj)
de_inst.clear()
except Exception: