mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Fixed cognitive complexity issues reported by SonarQube.
This commit is contained in:
committed by
Akshay Joshi
parent
f278490540
commit
72f0e87367
@@ -346,68 +346,29 @@ class Connection(BaseConnection):
|
||||
|
||||
return status, msg
|
||||
|
||||
def _initialize(self, conn_id, **kwargs):
|
||||
self.execution_aborted = False
|
||||
self.__backend_pid = self.conn.get_backend_pid()
|
||||
|
||||
setattr(g, "{0}#{1}".format(
|
||||
self.manager.sid,
|
||||
self.conn_id.encode('utf-8')
|
||||
), None)
|
||||
|
||||
status, cur = self.__cursor()
|
||||
formatted_exception_msg = self._formatted_exception_msg
|
||||
manager = self.manager
|
||||
|
||||
def _execute(cur, query, params=None):
|
||||
try:
|
||||
self.__internal_blocking_execute(cur, query, params)
|
||||
except psycopg2.Error as pe:
|
||||
cur.close()
|
||||
return formatted_exception_msg(pe, False)
|
||||
return None
|
||||
|
||||
# autocommit flag does not work with asynchronous connections.
|
||||
# By default asynchronous connection runs in autocommit mode.
|
||||
def _set_auto_commit(self, kwargs):
|
||||
"""
|
||||
autocommit flag does not work with asynchronous connections.
|
||||
By default asynchronous connection runs in autocommit mode.
|
||||
:param kwargs:
|
||||
:return:
|
||||
"""
|
||||
if self.async_ == 0:
|
||||
if 'autocommit' in kwargs and kwargs['autocommit'] is False:
|
||||
self.conn.autocommit = False
|
||||
else:
|
||||
self.conn.autocommit = True
|
||||
|
||||
register_string_typecasters(self.conn)
|
||||
|
||||
if self.array_to_string:
|
||||
register_array_to_string_typecasters(self.conn)
|
||||
|
||||
# Register type casters for binary data only after registering array to
|
||||
# string type casters.
|
||||
if self.use_binary_placeholder:
|
||||
register_binary_typecasters(self.conn)
|
||||
|
||||
postgres_encoding, self.python_encoding, typecast_encoding = \
|
||||
get_encoding(self.conn.encoding)
|
||||
|
||||
# Note that we use 'UPDATE pg_settings' for setting bytea_output as a
|
||||
# convenience hack for those running on old, unsupported versions of
|
||||
# PostgreSQL 'cos we're nice like that.
|
||||
status = _execute(
|
||||
cur,
|
||||
"SET DateStyle=ISO; "
|
||||
"SET client_min_messages=notice; "
|
||||
"SELECT set_config('bytea_output','hex',false) FROM pg_settings"
|
||||
" WHERE name = 'bytea_output'; "
|
||||
"SET client_encoding='{0}';".format(postgres_encoding)
|
||||
)
|
||||
|
||||
if status is not None:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
return False, status
|
||||
|
||||
def _set_role(self, manager, cur, conn_id):
|
||||
"""
|
||||
Set role
|
||||
:param manager:
|
||||
:param cur:
|
||||
:param conn_id:
|
||||
:return:
|
||||
"""
|
||||
if manager.role:
|
||||
status = _execute(cur, "SET ROLE TO %s", [manager.role])
|
||||
status = self._execute(cur, "SET ROLE TO %s", [manager.role])
|
||||
|
||||
if status is not None:
|
||||
self.conn.close()
|
||||
@@ -425,9 +386,71 @@ class Connection(BaseConnection):
|
||||
_(
|
||||
"Failed to setup the role with error message:\n{0}"
|
||||
).format(status)
|
||||
return False, ''
|
||||
|
||||
def _execute(self, cur, query, params=None):
|
||||
formatted_exception_msg = self._formatted_exception_msg
|
||||
try:
|
||||
self.__internal_blocking_execute(cur, query, params)
|
||||
except psycopg2.Error as pe:
|
||||
cur.close()
|
||||
return formatted_exception_msg(pe, False)
|
||||
return None
|
||||
|
||||
def _initialize(self, conn_id, **kwargs):
|
||||
self.execution_aborted = False
|
||||
self.__backend_pid = self.conn.get_backend_pid()
|
||||
|
||||
setattr(g, "{0}#{1}".format(
|
||||
self.manager.sid,
|
||||
self.conn_id.encode('utf-8')
|
||||
), None)
|
||||
|
||||
status, cur = self.__cursor()
|
||||
|
||||
manager = self.manager
|
||||
|
||||
# autocommit flag does not work with asynchronous connections.
|
||||
# By default asynchronous connection runs in autocommit mode.
|
||||
self._set_auto_commit(kwargs)
|
||||
|
||||
register_string_typecasters(self.conn)
|
||||
|
||||
if self.array_to_string:
|
||||
register_array_to_string_typecasters(self.conn)
|
||||
|
||||
# Register type casters for binary data only after registering array to
|
||||
# string type casters.
|
||||
if self.use_binary_placeholder:
|
||||
register_binary_typecasters(self.conn)
|
||||
|
||||
postgres_encoding, self.python_encoding, typecast_encoding = \
|
||||
get_encoding(self.conn.encoding)
|
||||
|
||||
# Note that we use 'UPDATE pg_settings' for setting bytea_output as a
|
||||
# convenience hack for those running on old, unsupported versions of
|
||||
# PostgreSQL 'cos we're nice like that.
|
||||
status = self._execute(
|
||||
cur,
|
||||
"SET DateStyle=ISO; "
|
||||
"SET client_min_messages=notice; "
|
||||
"SELECT set_config('bytea_output','hex',false) FROM pg_settings"
|
||||
" WHERE name = 'bytea_output'; "
|
||||
"SET client_encoding='{0}';".format(postgres_encoding)
|
||||
)
|
||||
|
||||
if status is not None:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
return False, status
|
||||
|
||||
is_error, errmsg = self._set_role(manager, cur, conn_id)
|
||||
if is_error:
|
||||
return False, errmsg
|
||||
|
||||
# Check database version every time on reconnection
|
||||
status = _execute(cur, "SELECT version()")
|
||||
status = self._execute(cur, "SELECT version()")
|
||||
|
||||
if status is not None:
|
||||
self.conn.close()
|
||||
@@ -449,7 +472,7 @@ class Connection(BaseConnection):
|
||||
manager.ver = row['version']
|
||||
manager.sversion = self.conn.server_version
|
||||
|
||||
status = _execute(cur, """
|
||||
status = self._execute(cur, """
|
||||
SELECT
|
||||
db.oid as did, db.datname, db.datallowconn,
|
||||
pg_encoding_to_char(db.encoding) AS serverencoding,
|
||||
@@ -468,10 +491,26 @@ WHERE db.datname = current_database()""")
|
||||
if len(manager.db_info) == 1:
|
||||
manager.did = res['did']
|
||||
|
||||
status = _execute(cur, """
|
||||
self._set_user_info(cur, manager)
|
||||
|
||||
self._set_server_type_and_password(kwargs, manager)
|
||||
|
||||
manager.update_session()
|
||||
|
||||
return True, None
|
||||
|
||||
def _set_user_info(self, cur, manager):
|
||||
"""
|
||||
Set user info.
|
||||
:param cur:
|
||||
:param manager:
|
||||
:return:
|
||||
"""
|
||||
status = self._execute(cur, """
|
||||
SELECT
|
||||
oid as id, rolname as name, rolsuper as is_superuser,
|
||||
CASE WHEN rolsuper THEN true ELSE rolcreaterole END as can_create_role,
|
||||
CASE WHEN rolsuper THEN true ELSE rolcreaterole END as
|
||||
can_create_role,
|
||||
CASE WHEN rolsuper THEN true ELSE rolcreatedb END as can_create_db
|
||||
FROM
|
||||
pg_catalog.pg_roles
|
||||
@@ -483,6 +522,13 @@ WHERE
|
||||
if cur.rowcount > 0:
|
||||
manager.user_info = cur.fetchmany(1)[0]
|
||||
|
||||
def _set_server_type_and_password(self, kwargs, manager):
|
||||
"""
|
||||
Set server type
|
||||
:param kwargs:
|
||||
:param manager:
|
||||
:return:
|
||||
"""
|
||||
if 'password' in kwargs:
|
||||
manager.password = kwargs['password']
|
||||
|
||||
@@ -501,10 +547,6 @@ WHERE
|
||||
manager.server_cls = st
|
||||
break
|
||||
|
||||
manager.update_session()
|
||||
|
||||
return True, None
|
||||
|
||||
def __cursor(self, server_cursor=False):
|
||||
|
||||
if not get_crypt_key()[0]:
|
||||
@@ -1188,26 +1230,36 @@ WHERE
|
||||
self.conn = None
|
||||
return False
|
||||
|
||||
def _decrypt_password(self, manager):
|
||||
"""
|
||||
Decrypt password
|
||||
:param manager: Manager for get password.
|
||||
:return:
|
||||
"""
|
||||
password = getattr(manager, 'password', None)
|
||||
if password:
|
||||
# Fetch Logged in User Details.
|
||||
user = User.query.filter_by(id=current_user.id).first()
|
||||
|
||||
if user is None:
|
||||
return False, gettext("Unauthorized request."), password
|
||||
|
||||
crypt_key_present, crypt_key = get_crypt_key()
|
||||
if not crypt_key_present:
|
||||
return False, crypt_key, password
|
||||
|
||||
password = decrypt(password, crypt_key).decode()
|
||||
return True, '', password
|
||||
|
||||
def reset(self):
|
||||
if self.conn and self.conn.closed:
|
||||
self.conn = None
|
||||
pg_conn = None
|
||||
manager = self.manager
|
||||
|
||||
password = getattr(manager, 'password', None)
|
||||
|
||||
if password:
|
||||
# Fetch Logged in User Details.
|
||||
user = User.query.filter_by(id=current_user.id).first()
|
||||
|
||||
if user is None:
|
||||
return False, gettext("Unauthorized request.")
|
||||
|
||||
crypt_key_present, crypt_key = get_crypt_key()
|
||||
if not crypt_key_present:
|
||||
return False, crypt_key
|
||||
|
||||
password = decrypt(password, crypt_key).decode()
|
||||
is_return, return_value, password = self._decrypt_password(manager)
|
||||
if is_return:
|
||||
return False, return_value
|
||||
|
||||
try:
|
||||
pg_conn = psycopg2.connect(
|
||||
|
||||
@@ -112,6 +112,20 @@ class ServerManager(object):
|
||||
|
||||
self.connections = dict()
|
||||
|
||||
def _set_password(self, res):
|
||||
"""
|
||||
Set password for server manager object.
|
||||
:param res: response dict.
|
||||
:return:
|
||||
"""
|
||||
if hasattr(self, 'password') and self.password:
|
||||
if hasattr(self.password, 'decode'):
|
||||
res['password'] = self.password.decode('utf-8')
|
||||
else:
|
||||
res['password'] = str(self.password)
|
||||
else:
|
||||
res['password'] = self.password
|
||||
|
||||
def as_dict(self):
|
||||
"""
|
||||
Returns a dictionary object representing the server manager.
|
||||
@@ -123,13 +137,8 @@ class ServerManager(object):
|
||||
res['sid'] = self.sid
|
||||
res['ver'] = self.ver
|
||||
res['sversion'] = self.sversion
|
||||
if hasattr(self, 'password') and self.password:
|
||||
if hasattr(self.password, 'decode'):
|
||||
res['password'] = self.password.decode('utf-8')
|
||||
else:
|
||||
res['password'] = str(self.password)
|
||||
else:
|
||||
res['password'] = self.password
|
||||
|
||||
self._set_password(res)
|
||||
|
||||
if self.use_ssh_tunnel:
|
||||
if hasattr(self, 'tunnel_password') and self.tunnel_password:
|
||||
@@ -244,6 +253,76 @@ WHERE db.oid = {0}""".format(did))
|
||||
|
||||
return self.connections[my_id]
|
||||
|
||||
@staticmethod
|
||||
def _get_password_to_conn(data, masterpass_processed):
|
||||
"""
|
||||
Get password for connect to server with simple and ssh connection.
|
||||
:param data: Data.
|
||||
:param masterpass_processed:
|
||||
:return:
|
||||
"""
|
||||
# The data variable is a copy so is not automatically synced
|
||||
# update here
|
||||
if masterpass_processed and 'password' in data:
|
||||
data['password'] = None
|
||||
if masterpass_processed and 'tunnel_password' in data:
|
||||
data['tunnel_password'] = None
|
||||
|
||||
def _get_server_type(self):
|
||||
"""
|
||||
Get server type and server cls.
|
||||
:return:
|
||||
"""
|
||||
from pgadmin.browser.server_groups.servers.types import ServerType
|
||||
|
||||
if self.ver and not self.server_type:
|
||||
for st in ServerType.types():
|
||||
if st.instance_of(self.ver):
|
||||
self.server_type = st.stype
|
||||
self.server_cls = st
|
||||
break
|
||||
|
||||
def _check_and_reconnect_server(self, conn, conn_info, data):
|
||||
"""
|
||||
Check and try to reconnect the server if server previously connected
|
||||
and auto_reconnect is true.
|
||||
:param conn:
|
||||
:type conn:
|
||||
:param conn_info:
|
||||
:type conn_info:
|
||||
:param data:
|
||||
:type data:
|
||||
:return:
|
||||
:rtype:
|
||||
"""
|
||||
from pgadmin.browser.server_groups.servers.types import ServerType
|
||||
if conn_info['wasConnected'] and conn_info['auto_reconnect']:
|
||||
try:
|
||||
# Check SSH Tunnel needs to be created
|
||||
if self.use_ssh_tunnel == 1 and \
|
||||
not self.tunnel_created:
|
||||
status, error = self.create_ssh_tunnel(
|
||||
data['tunnel_password'])
|
||||
|
||||
# Check SSH Tunnel is alive or not.
|
||||
self.check_ssh_tunnel_alive()
|
||||
|
||||
conn.connect(
|
||||
password=data['password'],
|
||||
server_types=ServerType.types()
|
||||
)
|
||||
# This will also update wasConnected flag in
|
||||
# connection so no need to update the flag manually.
|
||||
except CryptKeyMissing:
|
||||
# maintain the status as this will help to restore once
|
||||
# the key is available
|
||||
conn.wasConnected = conn_info['wasConnected']
|
||||
conn.auto_reconnect = conn_info['auto_reconnect']
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
self.connections.pop(conn_info['conn_id'])
|
||||
raise
|
||||
|
||||
def _restore(self, data):
|
||||
"""
|
||||
Helps restoring to reconnect the auto-connect connections smoothly on
|
||||
@@ -253,21 +332,9 @@ WHERE db.oid = {0}""".format(did))
|
||||
# restarted. As we need server version to resolve sql template paths.
|
||||
masterpass_processed = process_masterpass_disabled()
|
||||
|
||||
# The data variable is a copy so is not automatically synced
|
||||
# update here
|
||||
if masterpass_processed and 'password' in data:
|
||||
data['password'] = None
|
||||
if masterpass_processed and 'tunnel_password' in data:
|
||||
data['tunnel_password'] = None
|
||||
|
||||
from pgadmin.browser.server_groups.servers.types import ServerType
|
||||
|
||||
if self.ver and not self.server_type:
|
||||
for st in ServerType.types():
|
||||
if st.instance_of(self.ver):
|
||||
self.server_type = st.stype
|
||||
self.server_cls = st
|
||||
break
|
||||
ServerManager._get_password_to_conn(data, masterpass_processed)
|
||||
# Get server type.
|
||||
self._get_server_type()
|
||||
|
||||
# We need to know about the existing server variant supports during
|
||||
# first connection for identifications.
|
||||
@@ -297,34 +364,8 @@ WHERE db.oid = {0}""".format(did))
|
||||
array_to_string=conn_info['array_to_string']
|
||||
)
|
||||
|
||||
# only try to reconnect if connection was connected previously
|
||||
# and auto_reconnect is true.
|
||||
if conn_info['wasConnected'] and conn_info['auto_reconnect']:
|
||||
try:
|
||||
# Check SSH Tunnel needs to be created
|
||||
if self.use_ssh_tunnel == 1 and \
|
||||
not self.tunnel_created:
|
||||
status, error = self.create_ssh_tunnel(
|
||||
data['tunnel_password'])
|
||||
|
||||
# Check SSH Tunnel is alive or not.
|
||||
self.check_ssh_tunnel_alive()
|
||||
|
||||
conn.connect(
|
||||
password=data['password'],
|
||||
server_types=ServerType.types()
|
||||
)
|
||||
# This will also update wasConnected flag in
|
||||
# connection so no need to update the flag manually.
|
||||
except CryptKeyMissing:
|
||||
# maintain the status as this will help to restore once
|
||||
# the key is available
|
||||
conn.wasConnected = conn_info['wasConnected']
|
||||
conn.auto_reconnect = conn_info['auto_reconnect']
|
||||
except Exception as e:
|
||||
current_app.logger.exception(e)
|
||||
self.connections.pop(conn_info['conn_id'])
|
||||
raise
|
||||
# only try to reconnect
|
||||
self._check_and_reconnect_server(conn, conn_info, data)
|
||||
|
||||
def _restore_connections(self):
|
||||
for conn_id in self.connections:
|
||||
@@ -358,26 +399,51 @@ WHERE db.oid = {0}""".format(did))
|
||||
current_app.logger.exception(e)
|
||||
raise
|
||||
|
||||
def release(self, database=None, conn_id=None, did=None):
|
||||
# Stop the SSH tunnel if release() function calls without
|
||||
# any parameter.
|
||||
def _stop_ssh_tunnel(self, did, database, conn_id):
|
||||
"""
|
||||
Stop ssh tunnel connection if function call without any parameter.
|
||||
:param did: Database Id.
|
||||
:param database: Database.
|
||||
:param conn_id: COnnection Id.
|
||||
:return:
|
||||
"""
|
||||
if database is None and conn_id is None and did is None:
|
||||
self.stop_ssh_tunnel()
|
||||
|
||||
def _check_db_info(self, did, conn_id, database):
|
||||
"""
|
||||
Check did is not none and it is resent in db_info.
|
||||
:param did: Database Id.
|
||||
:param conn_id: Connection Id.
|
||||
:return:
|
||||
"""
|
||||
if database is None and conn_id is None and did is None:
|
||||
self.stop_ssh_tunnel()
|
||||
|
||||
my_id = None
|
||||
if did is not None:
|
||||
if did in self.db_info and 'datname' in self.db_info[did]:
|
||||
database = self.db_info[did]['datname']
|
||||
if database is None:
|
||||
return False
|
||||
return True, False, my_id
|
||||
else:
|
||||
return False
|
||||
return True, False, my_id
|
||||
|
||||
my_id = None
|
||||
if conn_id is not None:
|
||||
my_id = 'CONN:{0}'.format(conn_id)
|
||||
elif database is not None:
|
||||
my_id = 'DB:{0}'.format(database)
|
||||
|
||||
return False, True, my_id
|
||||
|
||||
def release(self, database=None, conn_id=None, did=None):
|
||||
# Stop the SSH tunnel if release() function calls without
|
||||
# any parameter.
|
||||
is_return, return_value, my_id = self._check_db_info(did, conn_id,
|
||||
database)
|
||||
if is_return:
|
||||
return return_value
|
||||
|
||||
if my_id is not None:
|
||||
if my_id in self.connections:
|
||||
self.connections[my_id]._release()
|
||||
|
||||
Reference in New Issue
Block a user