Fixed cognitive complexity issues reported by SonarQube.

This commit is contained in:
Aditya Toshniwal 2020-08-11 15:13:35 +05:30 committed by Akshay Joshi
parent 082b968bbc
commit 8129df42da
5 changed files with 177 additions and 214 deletions

View File

@ -325,19 +325,9 @@ class JobScheduleView(PGChildNodeView):
sid: Server ID
jid: Job ID
"""
data = {}
if request.args:
for k, v in request.args.items():
try:
data[k] = json.loads(
v.decode('utf-8') if hasattr(v, 'decode') else v
)
except ValueError:
data[k] = v
else:
data = json.loads(request.data.decode())
# convert python list literal to postgres array literal.
format_schedule_data(data)
data = json.loads(request.data, encoding='utf-8')
# convert python list literal to postgres array literal.
format_schedule_data(data)
sql = render_template(
"/".join([self.template_path, self._CREATE_SQL]),

View File

@ -431,11 +431,9 @@ SELECT EXISTS(
data['jstconntype'] = row['jstconntype']
if row['jstconntype']:
if not ('jstdbname' in data):
data['jstdbname'] = row['jstdbname']
data['jstdbname'] = data.get('jstdbname', row['jstdbname'])
else:
if not ('jstconnstr' in data):
data['jstconnstr'] = row['jstconnstr']
data['jstconnstr'] = data.get('jstconnstr', row['jstconnstr'])
sql = render_template(
"/".join([self.template_path, self._UPDATE_SQL]),
@ -516,9 +514,7 @@ SELECT EXISTS(
sql = ''
for k, v in request.args.items():
try:
data[k] = json.loads(
v.decode('utf-8') if hasattr(v, 'decode') else v
)
data[k] = json.loads(v, 'utf-8')
except ValueError:
data[k] = v
@ -529,46 +525,49 @@ SELECT EXISTS(
data=data,
has_connstr=self.manager.db_info['pgAgent']['has_connstr']
)
else:
if (
self.manager.db_info['pgAgent']['has_connstr'] and
'jstconntype' not in data and
('jstdbname' in data or 'jstconnstr' in data)
):
sql = render_template(
"/".join([self.template_path, self._PROPERTIES_SQL]),
jstid=jstid,
jid=jid,
has_connstr=self.manager.db_info['pgAgent']['has_connstr']
)
status, res = self.conn.execute_dict(sql)
if not status:
return internal_server_error(errormsg=res)
if len(res['rows']) == 0:
return gone(
errormsg=gettext(
"Could not find the specified job step."
)
)
row = res['rows'][0]
data['jstconntype'] = row['jstconntype']
if row['jstconntype']:
if not ('jstdbname' in data):
data['jstdbname'] = row['jstdbname']
else:
if not ('jstconnstr' in data):
data['jstconnstr'] = row['jstconnstr']
return make_json_response(
data=sql,
status=200
)
if (
self.manager.db_info['pgAgent']['has_connstr'] and
'jstconntype' not in data and
('jstdbname' in data or 'jstconnstr' in data)
):
sql = render_template(
"/".join([self.template_path, self._UPDATE_SQL]),
jid=jid,
"/".join([self.template_path, self._PROPERTIES_SQL]),
jstid=jstid,
data=data,
jid=jid,
has_connstr=self.manager.db_info['pgAgent']['has_connstr']
)
status, res = self.conn.execute_dict(sql)
if not status:
return internal_server_error(errormsg=res)
if len(res['rows']) == 0:
return gone(
errormsg=gettext(
"Could not find the specified job step."
)
)
row = res['rows'][0]
data['jstconntype'] = row['jstconntype']
if row['jstconntype']:
data['jstdbname'] = data.get('jstdbname', row['jstdbname'])
else:
data['jstconnstr'] = data.get('jstconnstr', row['jstconnstr'])
sql = render_template(
"/".join([self.template_path, self._UPDATE_SQL]),
jid=jid,
jstid=jstid,
data=data,
has_connstr=self.manager.db_info['pgAgent']['has_connstr']
)
return make_json_response(
data=sql,

View File

@ -191,52 +191,35 @@ def check_precondition(f):
kwargs['sid']
)
stats_type = ('activity', 'prepared', 'locks', 'config')
def get_error(i_node_type):
stats_type = ('activity', 'prepared', 'locks', 'config')
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected {0}"
" to view the table.".format(i_node_type))
)
else:
return precondition_required(
gettext("Please connect to the selected {0}"
" to view the graph.".format(i_node_type))
)
# Below check handle the case where existing server is deleted
# by user and python server will raise exception if this check
# is not introduce.
if g.manager is None:
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected server"
" to view the table.")
)
else:
return precondition_required(
gettext("Please connect to the selected server"
" to view the graph.")
)
return get_error('server')
if 'did' in kwargs:
g.conn = g.manager.connection(did=kwargs['did'])
# If the selected DB not connected then return error to browser
if not g.conn.connected():
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected database"
" to view the table.")
)
else:
return precondition_required(
gettext("Please connect to the selected database to"
" view the graph.")
)
node_type = 'database'
else:
g.conn = g.manager.connection()
node_type = 'server'
# If DB not connected then return error to browser
if not g.conn.connected():
if f.__name__ in stats_type:
return precondition_required(
gettext("Please connect to the selected server"
" to view the table.")
)
else:
return precondition_required(
gettext("Please connect to the selected server"
" to view the graph.")
)
# If not connected then return error to browser
if not g.conn.connected():
return get_error(node_type)
# Set template path for sql scripts
g.server_type = g.manager.server_type

View File

@ -15,55 +15,66 @@ import re
import getpass
def user_info_desktop():
print(u"NOTE: Configuring authentication for DESKTOP mode.")
email = config.DESKTOP_USER
p1 = ''.join([
random.choice(string.ascii_letters + string.digits)
for _ in range(32)
])
return email, p1
def user_info_server():
print(u"NOTE: Configuring authentication for SERVER mode.\n")
if all(value in os.environ for value in
['PGADMIN_SETUP_EMAIL', 'PGADMIN_SETUP_PASSWORD']):
email = ''
p1 = ''
if os.environ['PGADMIN_SETUP_EMAIL'] \
and os.environ['PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
p1 = os.environ['PGADMIN_SETUP_PASSWORD']
else:
# Prompt the user for their default username and password.
print(
u"Enter the email address and password to use for the initial "
u"pgAdmin user account:\n"
)
email_filter = re.compile(
"^[a-zA-Z0-9.!#$%&'*+\\/=?^_`{|}~-]+@[a-zA-Z0-9]"
"(?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9]"
"(?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$"
)
email = input("Email address: ")
while email == '' or not email_filter.match(email):
print(u'Invalid email address. Please try again.')
email = input("Email address: ")
def pprompt():
return getpass.getpass(), getpass.getpass('Retype password:')
p1, p2 = pprompt()
while p1 != p2 or len(p1) < 6:
if p1 != p2:
print(u'Passwords do not match. Please try again.')
else:
print(
u'Password must be at least 6 characters. '
u'Please try again.'
)
p1, p2 = pprompt()
return email, p1
def user_info():
if config.SERVER_MODE is False:
print(u"NOTE: Configuring authentication for DESKTOP mode.")
email = config.DESKTOP_USER
p1 = ''.join([
random.choice(string.ascii_letters + string.digits)
for _ in range(32)
])
email, p1 = user_info_desktop()
else:
print(u"NOTE: Configuring authentication for SERVER mode.\n")
email, p1 = user_info_server()
if all(value in os.environ for value in
['PGADMIN_SETUP_EMAIL', 'PGADMIN_SETUP_PASSWORD']):
email = ''
p1 = ''
if os.environ['PGADMIN_SETUP_EMAIL'] and os.environ[
'PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
p1 = os.environ['PGADMIN_SETUP_PASSWORD']
else:
# Prompt the user for their default username and password.
print(
u"Enter the email address and password to use for the initial "
u"pgAdmin user account:\n"
)
email_filter = re.compile(
"^[a-zA-Z0-9.!#$%&'*+\\/=?^_`{|}~-]+@[a-zA-Z0-9]"
"(?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9]"
"(?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$"
)
email = input("Email address: ")
while email == '' or not email_filter.match(email):
print(u'Invalid email address. Please try again.')
email = input("Email address: ")
def pprompt():
return getpass.getpass(), getpass.getpass('Retype password:')
p1, p2 = pprompt()
while p1 != p2 or len(p1) < 6:
if p1 != p2:
print(u'Passwords do not match. Please try again.')
else:
print(
u'Password must be at least 6 characters. '
u'Please try again.'
)
p1, p2 = pprompt()
return email, p1

View File

@ -111,27 +111,8 @@ class _Preference(object):
# The data stored in the configuration will be in string format, we
# need to convert them in proper format.
if self._type == 'boolean' or self._type == 'switch' or \
self._type == 'node':
if self._type in ('boolean', 'switch', 'node'):
return res.value == 'True'
if self._type == 'integer':
try:
return int(res.value)
except Exception as e:
current_app.logger.exception(e)
return self.default
if self._type == 'numeric':
try:
return decimal.Decimal(res.value)
except Exception as e:
current_app.logger.exception(e)
return self.default
if self._type == 'date' or self._type == 'datetime':
try:
return dateutil_parser.parse(res.value)
except Exception as e:
current_app.logger.exception(e)
return self.default
if self._type == 'options':
for opt in self.options:
if 'value' in opt and opt['value'] == res.value:
@ -139,16 +120,21 @@ class _Preference(object):
if self.select2 and self.select2['tags']:
return res.value
return self.default
if self._type == 'text' and res.value == '' and \
(self.allow_blanks is None or not self.allow_blanks):
if self._type == 'text' and res.value == '' and not self.allow_blanks:
return self.default
if self._type == 'keyboardshortcut':
try:
return json.loads(res.value)
except Exception as e:
current_app.logger.exception(e)
return self.default
parser_map = {
'integer': int,
'numeric': decimal.Decimal,
'date': dateutil_parser.parse,
'datetime': dateutil_parser.parse,
'keyboardshortcut': json.loads
}
try:
return parser_map.get(self._type, lambda v: v)(res.value)
except Exception as e:
current_app.logger.exception(e)
return self.default
return res.value
def set(self, value):
@ -162,59 +148,44 @@ class _Preference(object):
"""
# We can't store the values in the given format, we need to convert
# them in string first. We also need to validate the value type.
if self._type == 'boolean' or self._type == 'switch' or \
self._type == 'node':
if type(value) != bool:
return False, gettext("Invalid value for a boolean option.")
elif self._type == 'integer':
value = int(value)
if self.min_val is not None and value < self.min_val:
value = self.min_val
if self.max_val is not None and value > self.max_val:
value = self.max_val
parser_map = {
'integer': int,
'numeric': float,
'date': dateutil_parser.parse,
'datetime': dateutil_parser.parse,
'keyboardshortcut': json.dumps
}
if type(value) != int:
return False, gettext("Invalid value for an integer option.")
elif self._type == 'numeric':
value = float(value)
error_map = {
'keyboardshortcut': 'keyboard shortcut'
}
if self.min_val is not None and value < self.min_val:
value = self.min_val
if self.max_val is not None and value > self.max_val:
value = self.max_val
t = type(value)
if t != float and t != int and t != decimal.Decimal:
return False, gettext("Invalid value for a numeric option.")
elif self._type == 'date':
try:
value = dateutil_parser.parse(value).date()
except Exception as e:
current_app.logger.exception(e)
return False, gettext("Invalid value for a date option.")
elif self._type == 'datetime':
try:
value = dateutil_parser.parse(value)
except Exception as e:
current_app.logger.exception(e)
return False, gettext("Invalid value for a datetime option.")
elif self._type == 'options':
has_value = False
for opt in self.options:
if 'value' in opt and opt['value'] == value:
has_value = True
if not has_value and self.select2 and not self.select2['tags']:
return False, gettext("Invalid value for an options option.")
elif self._type == 'keyboardshortcut':
try:
value = json.dumps(value)
except Exception as e:
current_app.logger.exception(e)
return False, gettext(
"Invalid value for a keyboard shortcut option."
)
try:
if self._type in ('boolean', 'switch', 'node'):
assert type(value) != bool
elif self._type == 'options':
has_value = next((True for opt in self.options
if 'value' in opt and opt['value'] == value),
False)
assert not has_value
assert self.select2
assert not self.select2['tags']
elif self._type == 'date':
value = parser_map[self._type](value).date()
else:
value = parser_map.get(self._type, lambda v: v)(value)
if self._type in ('integer', 'numeric'):
value = self.normalize_range(value)
assert type(value) != int
if self._type == 'numeric':
assert type(value) != float
assert type(value) != decimal.Decimal
except Exception as e:
current_app.logger.exception(e)
return False, gettext(
"Invalid value for {0} option.".format(
error_map.get(self._type, self._type)))
pref = UserPrefTable.query.filter_by(
pid=self.pid
@ -232,6 +203,15 @@ class _Preference(object):
return True, None
def normalize_range(self, value):
ret_val = value
if self.min_val is not None and value < self.min_val:
ret_val = self.min_val
if self.max_val is not None and value > self.max_val:
ret_val = self.max_val
return ret_val
def to_json(self):
"""
to_json