mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2024-12-02 13:29:11 -06:00
1856 lines
65 KiB
Python
1856 lines
65 KiB
Python
##########################################################################
|
|
#
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
#
|
|
# Copyright (C) 2013 - 2018, The pgAdmin Development Team
|
|
# This software is released under the PostgreSQL Licence
|
|
#
|
|
##########################################################################
|
|
|
|
"""
|
|
Implementation of Connection.
|
|
It is a wrapper around the actual psycopg2 driver, and connection
|
|
object.
|
|
"""
|
|
|
|
import random
|
|
import select
|
|
import sys
|
|
import datetime
|
|
from collections import deque
|
|
import simplejson as json
|
|
import psycopg2
|
|
from flask import g, current_app
|
|
from flask_babelex import gettext
|
|
from flask_security import current_user
|
|
from pgadmin.utils.crypto import decrypt
|
|
from psycopg2.extensions import adapt, encodings
|
|
|
|
import config
|
|
from pgadmin.model import Server, User
|
|
from pgadmin.utils.exception import ConnectionLost
|
|
from pgadmin.utils import get_complete_file_path
|
|
from ..abstract import BaseDriver, BaseConnection
|
|
from .cursor import DictCursor
|
|
from .typecast import register_global_typecasters, \
|
|
register_string_typecasters, register_binary_typecasters, \
|
|
register_array_to_string_typecasters, ALL_JSON_TYPES
|
|
|
|
|
|
if sys.version_info < (3,):
|
|
# Python2 in-built csv module do not handle unicode
|
|
# backports.csv module ported from PY3 csv module for unicode handling
|
|
from backports import csv
|
|
from StringIO import StringIO
|
|
IS_PY2 = True
|
|
else:
|
|
from io import StringIO
|
|
import csv
|
|
IS_PY2 = False
|
|
|
|
_ = gettext
|
|
|
|
# Register global type caster which will be applicable to all connections.
|
|
register_global_typecasters()
|
|
|
|
|
|
class Connection(BaseConnection):
|
|
"""
|
|
class Connection(object)
|
|
|
|
A wrapper class, which wraps the psycopg2 connection object, and
|
|
delegate the execution to the actual connection object, when required.
|
|
|
|
Methods:
|
|
-------
|
|
* connect(**kwargs)
|
|
- Connect the PostgreSQL/EDB Postgres Advanced Server using the psycopg2
|
|
driver
|
|
|
|
* execute_scalar(query, params, formatted_exception_msg)
|
|
- Execute the given query and returns single datum result
|
|
|
|
* execute_async(query, params, formatted_exception_msg)
|
|
- Execute the given query asynchronously and returns result.
|
|
|
|
* execute_void(query, params, formatted_exception_msg)
|
|
- Execute the given query with no result.
|
|
|
|
* execute_2darray(query, params, formatted_exception_msg)
|
|
- Execute the given query and returns the result as a 2 dimensional
|
|
array.
|
|
|
|
* execute_dict(query, params, formatted_exception_msg)
|
|
- Execute the given query and returns the result as an array of dict
|
|
(column name -> value) format.
|
|
|
|
* connected()
|
|
- Get the status of the connection.
|
|
Returns True if connected, otherwise False.
|
|
|
|
* reset()
|
|
- Reconnect the database server (if possible)
|
|
|
|
* transaction_status()
|
|
- Transaction Status
|
|
|
|
* ping()
|
|
- Ping the server.
|
|
|
|
* _release()
|
|
- Release the connection object of psycopg2
|
|
|
|
* _reconnect()
|
|
- Attempt to reconnect to the database
|
|
|
|
* _wait(conn)
|
|
- This method is used to wait for asynchronous connection. This is a
|
|
blocking call.
|
|
|
|
* _wait_timeout(conn)
|
|
- This method is used to wait for asynchronous connection with timeout.
|
|
This is a non blocking call.
|
|
|
|
* poll(formatted_exception_msg)
|
|
- This method is used to poll the data of query running on asynchronous
|
|
connection.
|
|
|
|
* status_message()
|
|
- Returns the status message returned by the last command executed on
|
|
the server.
|
|
|
|
* rows_affected()
|
|
- Returns the no of rows affected by the last command executed on
|
|
the server.
|
|
|
|
* cancel_transaction(conn_id, did=None)
|
|
- This method is used to cancel the transaction for the
|
|
specified connection id and database id.
|
|
|
|
* messages()
|
|
- Returns the list of messages/notices sends from the PostgreSQL database
|
|
server.
|
|
|
|
* _formatted_exception_msg(exception_obj, formatted_msg)
|
|
- This method is used to parse the psycopg2.Error object and returns the
|
|
formatted error message if flag is set to true else return
|
|
normal error message.
|
|
|
|
* check_notifies(required_polling)
|
|
- Check for the notify messages by polling the connection or after
|
|
execute is there in notifies.
|
|
|
|
* get_notifies()
|
|
- This function will returns list of notifies received from database
|
|
server.
|
|
|
|
* pq_encrypt_password_conn()
|
|
- This function will return the encrypted password for database server
|
|
- greater than or equal to 10.
|
|
"""
|
|
|
|
def __init__(self, manager, conn_id, db, auto_reconnect=True, async_=0,
|
|
use_binary_placeholder=False, array_to_string=False):
|
|
assert (manager is not None)
|
|
assert (conn_id is not None)
|
|
|
|
self.conn_id = conn_id
|
|
self.manager = manager
|
|
self.db = db if db is not None else manager.db
|
|
self.conn = None
|
|
self.auto_reconnect = auto_reconnect
|
|
self.async_ = async_
|
|
self.__async_cursor = None
|
|
self.__async_query_id = None
|
|
self.__backend_pid = None
|
|
self.execution_aborted = False
|
|
self.row_count = 0
|
|
self.__notices = None
|
|
self.__notifies = None
|
|
self.password = None
|
|
# This flag indicates the connection status (connected/disconnected).
|
|
self.wasConnected = False
|
|
# This flag indicates the connection reconnecting status.
|
|
self.reconnecting = False
|
|
self.use_binary_placeholder = use_binary_placeholder
|
|
self.array_to_string = array_to_string
|
|
|
|
super(Connection, self).__init__()
|
|
|
|
def as_dict(self):
|
|
"""
|
|
Returns the dictionary object representing this object.
|
|
"""
|
|
# In case, it cannot be auto reconnectable, or already been released,
|
|
# then we will return None.
|
|
if not self.auto_reconnect and not self.conn:
|
|
return None
|
|
|
|
res = dict()
|
|
res['conn_id'] = self.conn_id
|
|
res['database'] = self.db
|
|
res['async_'] = self.async_
|
|
res['wasConnected'] = self.wasConnected
|
|
res['auto_reconnect'] = self.auto_reconnect
|
|
res['use_binary_placeholder'] = self.use_binary_placeholder
|
|
res['array_to_string'] = self.array_to_string
|
|
|
|
return res
|
|
|
|
def __repr__(self):
|
|
return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
|
|
self.conn_id, self.db,
|
|
'Connected' if self.conn and not self.conn.closed else
|
|
"Disconnected",
|
|
self.async_
|
|
)
|
|
|
|
def __str__(self):
|
|
return "PG Connection: {0} ({1}) -> {2} (ajax:{3})".format(
|
|
self.conn_id, self.db,
|
|
'Connected' if self.conn and not self.conn.closed else
|
|
"Disconnected",
|
|
self.async_
|
|
)
|
|
|
|
def connect(self, **kwargs):
|
|
if self.conn:
|
|
if self.conn.closed:
|
|
self.conn = None
|
|
else:
|
|
return True, None
|
|
|
|
pg_conn = None
|
|
password = None
|
|
passfile = None
|
|
manager = self.manager
|
|
|
|
encpass = kwargs['password'] if 'password' in kwargs else None
|
|
passfile = kwargs['passfile'] if 'passfile' in kwargs else None
|
|
tunnel_password = kwargs['tunnel_password'] if 'tunnel_password' in \
|
|
kwargs else ''
|
|
|
|
# Check SSH Tunnel needs to be created
|
|
if manager.use_ssh_tunnel == 1 and not manager.tunnel_created:
|
|
status, error = manager.create_ssh_tunnel(tunnel_password)
|
|
if not status:
|
|
return False, error
|
|
|
|
# Check SSH Tunnel is alive or not.
|
|
if manager.use_ssh_tunnel == 1:
|
|
manager.check_ssh_tunnel_alive()
|
|
|
|
if encpass is None:
|
|
encpass = self.password or getattr(manager, 'password', None)
|
|
|
|
# Reset the existing connection password
|
|
if self.reconnecting is not False:
|
|
self.password = None
|
|
|
|
if encpass:
|
|
# Fetch Logged in User Details.
|
|
user = User.query.filter_by(id=current_user.id).first()
|
|
|
|
if user is None:
|
|
return False, gettext("Unauthorized request.")
|
|
|
|
try:
|
|
password = decrypt(encpass, user.password)
|
|
# Handling of non ascii password (Python2)
|
|
if hasattr(str, 'decode'):
|
|
password = password.decode('utf-8').encode('utf-8')
|
|
# password is in bytes, for python3 we need it in string
|
|
elif isinstance(password, bytes):
|
|
password = password.decode()
|
|
|
|
except Exception as e:
|
|
manager.stop_ssh_tunnel()
|
|
current_app.logger.exception(e)
|
|
return False, \
|
|
_(
|
|
"Failed to decrypt the saved password.\nError: {0}"
|
|
).format(str(e))
|
|
|
|
# If no password credential is found then connect request might
|
|
# come from Query tool, ViewData grid, debugger etc tools.
|
|
# we will check for pgpass file availability from connection manager
|
|
# if it's present then we will use it
|
|
if not password and not encpass and not passfile:
|
|
passfile = manager.passfile if manager.passfile else None
|
|
|
|
try:
|
|
if hasattr(str, 'decode'):
|
|
database = self.db.encode('utf-8')
|
|
user = manager.user.encode('utf-8')
|
|
conn_id = self.conn_id.encode('utf-8')
|
|
else:
|
|
database = self.db
|
|
user = manager.user
|
|
conn_id = self.conn_id
|
|
|
|
import os
|
|
os.environ['PGAPPNAME'] = '{0} - {1}'.format(
|
|
config.APP_NAME, conn_id)
|
|
|
|
pg_conn = psycopg2.connect(
|
|
host=manager.local_bind_host if manager.use_ssh_tunnel
|
|
else manager.host,
|
|
hostaddr=manager.local_bind_host if manager.use_ssh_tunnel
|
|
else manager.hostaddr,
|
|
port=manager.local_bind_port if manager.use_ssh_tunnel
|
|
else manager.port,
|
|
database=database,
|
|
user=user,
|
|
password=password,
|
|
async_=self.async_,
|
|
passfile=get_complete_file_path(passfile),
|
|
sslmode=manager.ssl_mode,
|
|
sslcert=get_complete_file_path(manager.sslcert),
|
|
sslkey=get_complete_file_path(manager.sslkey),
|
|
sslrootcert=get_complete_file_path(manager.sslrootcert),
|
|
sslcrl=get_complete_file_path(manager.sslcrl),
|
|
sslcompression=True if manager.sslcompression else False,
|
|
service=manager.service,
|
|
connect_timeout=manager.connect_timeout
|
|
)
|
|
|
|
# If connection is asynchronous then we will have to wait
|
|
# until the connection is ready to use.
|
|
if self.async_ == 1:
|
|
self._wait(pg_conn)
|
|
|
|
except psycopg2.Error as e:
|
|
manager.stop_ssh_tunnel()
|
|
if e.pgerror:
|
|
msg = e.pgerror
|
|
elif e.diag.message_detail:
|
|
msg = e.diag.message_detail
|
|
else:
|
|
msg = str(e)
|
|
current_app.logger.info(
|
|
u"Failed to connect to the database server(#{server_id}) for "
|
|
u"connection ({conn_id}) with error message as below"
|
|
u":{msg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=conn_id,
|
|
msg=msg.decode('utf-8') if hasattr(str, 'decode') else msg
|
|
)
|
|
)
|
|
return False, msg
|
|
|
|
# Overwrite connection notice attr to support
|
|
# more than 50 notices at a time
|
|
pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
|
|
|
|
self.conn = pg_conn
|
|
self.wasConnected = True
|
|
try:
|
|
status, msg = self._initialize(conn_id, **kwargs)
|
|
except Exception as e:
|
|
manager.stop_ssh_tunnel()
|
|
current_app.logger.exception(e)
|
|
self.conn = None
|
|
if not self.reconnecting:
|
|
self.wasConnected = False
|
|
raise e
|
|
|
|
if status:
|
|
manager._update_password(encpass)
|
|
else:
|
|
if not self.reconnecting:
|
|
self.wasConnected = False
|
|
|
|
return status, msg
|
|
|
|
def _initialize(self, conn_id, **kwargs):
|
|
self.execution_aborted = False
|
|
self.__backend_pid = self.conn.get_backend_pid()
|
|
|
|
setattr(g, "{0}#{1}".format(
|
|
self.manager.sid,
|
|
self.conn_id.encode('utf-8')
|
|
), None)
|
|
|
|
status, cur = self.__cursor()
|
|
formatted_exception_msg = self._formatted_exception_msg
|
|
manager = self.manager
|
|
|
|
def _execute(cur, query, params=None):
|
|
try:
|
|
self.__internal_blocking_execute(cur, query, params)
|
|
except psycopg2.Error as pe:
|
|
cur.close()
|
|
return formatted_exception_msg(pe, False)
|
|
return None
|
|
|
|
# autocommit flag does not work with asynchronous connections.
|
|
# By default asynchronous connection runs in autocommit mode.
|
|
if self.async_ == 0:
|
|
if 'autocommit' in kwargs and kwargs['autocommit'] is False:
|
|
self.conn.autocommit = False
|
|
else:
|
|
self.conn.autocommit = True
|
|
|
|
register_string_typecasters(self.conn)
|
|
|
|
if self.array_to_string:
|
|
register_array_to_string_typecasters(self.conn)
|
|
|
|
# Register type casters for binary data only after registering array to
|
|
# string type casters.
|
|
if self.use_binary_placeholder:
|
|
register_binary_typecasters(self.conn)
|
|
|
|
if self.conn.encoding in ('SQL_ASCII', 'SQLASCII',
|
|
'MULE_INTERNAL', 'MULEINTERNAL'):
|
|
status = _execute(cur, "SET DateStyle=ISO;"
|
|
"SET client_min_messages=notice;"
|
|
"SET bytea_output=escape;"
|
|
"SET client_encoding='{0}';"
|
|
.format(self.conn.encoding))
|
|
self.python_encoding = 'raw_unicode_escape'
|
|
else:
|
|
status = _execute(cur, "SET DateStyle=ISO;"
|
|
"SET client_min_messages=notice;"
|
|
"SET bytea_output=escape;"
|
|
"SET client_encoding='UNICODE';")
|
|
self.python_encoding = 'utf-8'
|
|
|
|
# Replace the python encoding for original name and renamed encodings
|
|
# psycopg2 removes the underscore in conn.encoding
|
|
# Setting the encodings dict value will only help for select statements
|
|
# because for parameterized DML, param values are converted based on
|
|
# python encoding of pyscopg2s internal encodings dict.
|
|
for key, val in encodings.items():
|
|
if key.replace('_', '') == self.conn.encoding:
|
|
encodings[key] = self.python_encoding
|
|
|
|
if status is not None:
|
|
self.conn.close()
|
|
self.conn = None
|
|
|
|
return False, status
|
|
|
|
if manager.role:
|
|
status = _execute(cur, u"SET ROLE TO %s", [manager.role])
|
|
|
|
if status is not None:
|
|
self.conn.close()
|
|
self.conn = None
|
|
current_app.logger.error(
|
|
"Connect to the database server (#{server_id}) for "
|
|
"connection ({conn_id}), but - failed to setup the role "
|
|
"with error message as below:{msg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=conn_id,
|
|
msg=status
|
|
)
|
|
)
|
|
return False, \
|
|
_(
|
|
"Failed to setup the role with error message:\n{0}"
|
|
).format(status)
|
|
|
|
if manager.ver is None:
|
|
status = _execute(cur, "SELECT version()")
|
|
|
|
if status is not None:
|
|
self.conn.close()
|
|
self.conn = None
|
|
self.wasConnected = False
|
|
current_app.logger.error(
|
|
"Failed to fetch the version information on the "
|
|
"established connection to the database server "
|
|
"(#{server_id}) for '{conn_id}' with below error "
|
|
"message:{msg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=conn_id,
|
|
msg=status)
|
|
)
|
|
return False, status
|
|
|
|
if cur.rowcount > 0:
|
|
row = cur.fetchmany(1)[0]
|
|
manager.ver = row['version']
|
|
manager.sversion = self.conn.server_version
|
|
|
|
status = _execute(cur, """
|
|
SELECT
|
|
db.oid as did, db.datname, db.datallowconn,
|
|
pg_encoding_to_char(db.encoding) AS serverencoding,
|
|
has_database_privilege(db.oid, 'CREATE') as cancreate, datlastsysoid
|
|
FROM
|
|
pg_database db
|
|
WHERE db.datname = current_database()""")
|
|
|
|
if status is None:
|
|
manager.db_info = manager.db_info or dict()
|
|
if cur.rowcount > 0:
|
|
res = cur.fetchmany(1)[0]
|
|
manager.db_info[res['did']] = res.copy()
|
|
|
|
# We do not have database oid for the maintenance database.
|
|
if len(manager.db_info) == 1:
|
|
manager.did = res['did']
|
|
|
|
status = _execute(cur, """
|
|
SELECT
|
|
oid as id, rolname as name, rolsuper as is_superuser,
|
|
rolcreaterole as can_create_role, rolcreatedb as can_create_db
|
|
FROM
|
|
pg_catalog.pg_roles
|
|
WHERE
|
|
rolname = current_user""")
|
|
|
|
if status is None:
|
|
manager.user_info = dict()
|
|
if cur.rowcount > 0:
|
|
manager.user_info = cur.fetchmany(1)[0]
|
|
|
|
if 'password' in kwargs:
|
|
manager.password = kwargs['password']
|
|
|
|
server_types = None
|
|
if 'server_types' in kwargs and isinstance(
|
|
kwargs['server_types'], list):
|
|
server_types = manager.server_types = kwargs['server_types']
|
|
|
|
if server_types is None:
|
|
from pgadmin.browser.server_groups.servers.types import ServerType
|
|
server_types = ServerType.types()
|
|
|
|
for st in server_types:
|
|
if st.instanceOf(manager.ver):
|
|
manager.server_type = st.stype
|
|
manager.server_cls = st
|
|
break
|
|
|
|
manager.update_session()
|
|
|
|
return True, None
|
|
|
|
def __cursor(self, server_cursor=False):
|
|
|
|
# Check SSH Tunnel is alive or not. If used by the database
|
|
# server for the connection.
|
|
if self.manager.use_ssh_tunnel == 1:
|
|
self.manager.check_ssh_tunnel_alive()
|
|
|
|
if self.wasConnected is False:
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
cur = getattr(g, "{0}#{1}".format(
|
|
self.manager.sid,
|
|
self.conn_id.encode('utf-8')
|
|
), None)
|
|
|
|
if self.connected() and cur and not cur.closed:
|
|
if not server_cursor or (server_cursor and cur.name):
|
|
return True, cur
|
|
|
|
if not self.connected():
|
|
errmsg = ""
|
|
|
|
current_app.logger.warning(
|
|
"Connection to database server (#{server_id}) for the "
|
|
"connection - '{conn_id}' has been lost.".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id
|
|
)
|
|
)
|
|
|
|
if self.auto_reconnect and not self.reconnecting:
|
|
self.__attempt_execution_reconnect(None)
|
|
else:
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
|
|
try:
|
|
if server_cursor:
|
|
# Providing name to cursor will create server side cursor.
|
|
cursor_name = "CURSOR:{0}".format(self.conn_id)
|
|
cur = self.conn.cursor(
|
|
name=cursor_name, cursor_factory=DictCursor
|
|
)
|
|
else:
|
|
cur = self.conn.cursor(cursor_factory=DictCursor)
|
|
except psycopg2.Error as pe:
|
|
current_app.logger.exception(pe)
|
|
errmsg = gettext(
|
|
"Failed to create cursor for psycopg2 connection with error "
|
|
"message for the server#{1}:{2}:\n{0}"
|
|
).format(
|
|
str(pe), self.manager.sid, self.db
|
|
)
|
|
|
|
current_app.logger.error(errmsg)
|
|
if self.conn.closed:
|
|
self.conn = None
|
|
if self.auto_reconnect and not self.reconnecting:
|
|
current_app.logger.info(
|
|
gettext(
|
|
"Attempting to reconnect to the database server "
|
|
"(#{server_id}) for the connection - '{conn_id}'."
|
|
).format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id
|
|
)
|
|
)
|
|
return self.__attempt_execution_reconnect(
|
|
self.__cursor, server_cursor
|
|
)
|
|
else:
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:'
|
|
else self.conn_id[5:]
|
|
)
|
|
|
|
setattr(
|
|
g, "{0}#{1}".format(
|
|
self.manager.sid, self.conn_id.encode('utf-8')
|
|
), cur
|
|
)
|
|
|
|
return True, cur
|
|
|
|
def escape_params_sqlascii(self, params):
|
|
# The data is unescaped using string_typecasters when selected
|
|
# We need to esacpe the data so that it does not fail when
|
|
# it is encoded with python ascii
|
|
# unicode_escape helps in escaping and unescaping
|
|
if self.conn:
|
|
if self.conn.encoding in ('SQL_ASCII', 'SQLASCII',
|
|
'MULE_INTERNAL', 'MULEINTERNAL')\
|
|
and params is not None and type(params) == dict:
|
|
params = dict(
|
|
(key, val.encode('unicode_escape')
|
|
.decode('raw_unicode_escape'))
|
|
for key, val in params.items()
|
|
)
|
|
return params
|
|
|
|
def __internal_blocking_execute(self, cur, query, params):
|
|
"""
|
|
This function executes the query using cursor's execute function,
|
|
but in case of asynchronous connection we need to wait for the
|
|
transaction to be completed. If self.async_ is 1 then it is a
|
|
blocking call.
|
|
|
|
Args:
|
|
cur: Cursor object
|
|
query: SQL query to run.
|
|
params: Extra parameters
|
|
"""
|
|
|
|
if sys.version_info < (3,):
|
|
if type(query) == unicode:
|
|
query = query.encode('utf-8')
|
|
else:
|
|
query = query.encode('utf-8')
|
|
|
|
params = self.escape_params_sqlascii(params)
|
|
cur.execute(query, params)
|
|
if self.async_ == 1:
|
|
self._wait(cur.connection)
|
|
|
|
def execute_on_server_as_csv(self,
|
|
query, params=None,
|
|
formatted_exception_msg=False,
|
|
records=2000):
|
|
"""
|
|
To fetch query result and generate CSV output
|
|
|
|
Args:
|
|
query: SQL
|
|
params: Additional parameters
|
|
formatted_exception_msg: For exception
|
|
records: Number of initial records
|
|
Returns:
|
|
Generator response
|
|
"""
|
|
status, cur = self.__cursor(server_cursor=True)
|
|
self.row_count = 0
|
|
|
|
if not status:
|
|
return False, str(cur)
|
|
query_id = random.randint(1, 9999999)
|
|
|
|
if IS_PY2 and type(query) == unicode:
|
|
query = query.encode('utf-8')
|
|
|
|
current_app.logger.log(
|
|
25,
|
|
u"Execute (with server cursor) for server #{server_id} - "
|
|
u"{conn_id} (Query-id: {query_id}):\n{query}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query.decode('utf-8') if
|
|
sys.version_info < (3,) else query,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
try:
|
|
self.__internal_blocking_execute(cur, query, params)
|
|
except psycopg2.Error as pe:
|
|
cur.close()
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
current_app.logger.error(
|
|
u"failed to execute query ((with server cursor) "
|
|
u"for the server #{server_id} - {conn_id} "
|
|
u"(query-id: {query_id}):\nerror message:{errmsg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
errmsg=errmsg,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
return False, errmsg
|
|
|
|
def handle_json_data(json_columns, results):
|
|
"""
|
|
[ This is only for Python2.x]
|
|
This function will be useful to handle json data types.
|
|
We will dump json data as proper json instead of unicode values
|
|
|
|
Args:
|
|
json_columns: Columns which contains json data
|
|
results: Query result
|
|
|
|
Returns:
|
|
results
|
|
"""
|
|
# Only if Python2 and there are columns with JSON type
|
|
if IS_PY2 and len(json_columns) > 0:
|
|
temp_results = []
|
|
for row in results:
|
|
res = dict()
|
|
for k, v in row.items():
|
|
if k in json_columns:
|
|
res[k] = json.dumps(v)
|
|
else:
|
|
res[k] = v
|
|
temp_results.append(res)
|
|
results = temp_results
|
|
return results
|
|
|
|
def convert_keys_to_unicode(results, conn_encoding):
|
|
"""
|
|
[ This is only for Python2.x]
|
|
We need to convert all keys to unicode as psycopg2
|
|
sends them as string
|
|
|
|
Args:
|
|
res: Query result set from psycopg2
|
|
conn_encoding: Connection encoding
|
|
|
|
Returns:
|
|
Result set (With all the keys converted to unicode)
|
|
"""
|
|
new_results = []
|
|
for row in results:
|
|
new_results.append(
|
|
dict([(k.decode(conn_encoding), v)
|
|
for k, v in row.items()])
|
|
)
|
|
return new_results
|
|
|
|
def gen(quote='strings', quote_char="'", field_separator=','):
|
|
|
|
results = cur.fetchmany(records)
|
|
if not results:
|
|
if not cur.closed:
|
|
cur.close()
|
|
yield gettext('The query executed did not return any data.')
|
|
return
|
|
|
|
header = []
|
|
json_columns = []
|
|
conn_encoding = encodings[cur.connection.encoding]
|
|
|
|
for c in cur.ordered_description():
|
|
# This is to handle the case in which column name is non-ascii
|
|
column_name = c.to_dict()['name']
|
|
if IS_PY2:
|
|
column_name = column_name.decode(conn_encoding)
|
|
header.append(column_name)
|
|
if c.to_dict()['type_code'] in ALL_JSON_TYPES:
|
|
json_columns.append(column_name)
|
|
|
|
if IS_PY2:
|
|
results = convert_keys_to_unicode(results, conn_encoding)
|
|
|
|
res_io = StringIO()
|
|
|
|
if quote == 'strings':
|
|
quote = csv.QUOTE_NONNUMERIC
|
|
elif quote == 'all':
|
|
quote = csv.QUOTE_ALL
|
|
else:
|
|
quote = csv.QUOTE_NONE
|
|
|
|
if hasattr(str, 'decode'):
|
|
# Decode the field_separator
|
|
try:
|
|
field_separator = field_separator.decode('utf-8')
|
|
except Exception as e:
|
|
current_app.logger.error(e)
|
|
|
|
# Decode the quote_char
|
|
try:
|
|
quote_char = quote_char.decode('utf-8')
|
|
except Exception as e:
|
|
current_app.logger.error(e)
|
|
|
|
csv_writer = csv.DictWriter(
|
|
res_io, fieldnames=header, delimiter=field_separator,
|
|
quoting=quote,
|
|
quotechar=quote_char
|
|
)
|
|
|
|
csv_writer.writeheader()
|
|
results = handle_json_data(json_columns, results)
|
|
csv_writer.writerows(results)
|
|
|
|
yield res_io.getvalue()
|
|
|
|
while True:
|
|
results = cur.fetchmany(records)
|
|
|
|
if not results:
|
|
if not cur.closed:
|
|
cur.close()
|
|
break
|
|
res_io = StringIO()
|
|
|
|
csv_writer = csv.DictWriter(
|
|
res_io, fieldnames=header, delimiter=field_separator,
|
|
quoting=quote,
|
|
quotechar=quote_char
|
|
)
|
|
|
|
if IS_PY2:
|
|
results = convert_keys_to_unicode(results, conn_encoding)
|
|
|
|
results = handle_json_data(json_columns, results)
|
|
csv_writer.writerows(results)
|
|
yield res_io.getvalue()
|
|
|
|
return True, gen
|
|
|
|
def execute_scalar(self, query, params=None,
|
|
formatted_exception_msg=False):
|
|
status, cur = self.__cursor()
|
|
self.row_count = 0
|
|
|
|
if not status:
|
|
return False, str(cur)
|
|
query_id = random.randint(1, 9999999)
|
|
|
|
current_app.logger.log(
|
|
25,
|
|
u"Execute (scalar) for server #{server_id} - {conn_id} (Query-id: "
|
|
u"{query_id}):\n{query}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
try:
|
|
self.__internal_blocking_execute(cur, query, params)
|
|
except psycopg2.Error as pe:
|
|
cur.close()
|
|
if not self.connected():
|
|
if self.auto_reconnect and not self.reconnecting:
|
|
return self.__attempt_execution_reconnect(
|
|
self.execute_dict, query, params,
|
|
formatted_exception_msg
|
|
)
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
current_app.logger.error(
|
|
u"Failed to execute query (execute_scalar) for the server "
|
|
u"#{server_id} - {conn_id} (Query-id: {query_id}):\n"
|
|
u"Error Message:{errmsg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
errmsg=errmsg,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
return False, errmsg
|
|
|
|
self.row_count = cur.rowcount
|
|
if cur.rowcount > 0:
|
|
res = cur.fetchone()
|
|
if len(res) > 0:
|
|
return True, res[0]
|
|
|
|
return True, None
|
|
|
|
def execute_async(self, query, params=None, formatted_exception_msg=True):
|
|
"""
|
|
This function executes the given query asynchronously and returns
|
|
result.
|
|
|
|
Args:
|
|
query: SQL query to run.
|
|
params: extra parameters to the function
|
|
formatted_exception_msg: if True then function return the
|
|
formatted exception message
|
|
"""
|
|
|
|
if sys.version_info < (3,):
|
|
if type(query) == unicode:
|
|
query = query.encode('utf-8')
|
|
else:
|
|
query = query.encode('utf-8')
|
|
|
|
# Convert the params based on python_encoding
|
|
params = self.escape_params_sqlascii(params)
|
|
|
|
self.__async_cursor = None
|
|
status, cur = self.__cursor()
|
|
|
|
if not status:
|
|
return False, str(cur)
|
|
query_id = random.randint(1, 9999999)
|
|
|
|
current_app.logger.log(
|
|
25,
|
|
u"Execute (async) for server #{server_id} - {conn_id} (Query-id: "
|
|
u"{query_id}):\n{query}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query.decode('utf-8'),
|
|
query_id=query_id
|
|
)
|
|
)
|
|
|
|
try:
|
|
self.__notices = []
|
|
self.__notifies = []
|
|
self.execution_aborted = False
|
|
cur.execute(query, params)
|
|
res = self._wait_timeout(cur.connection)
|
|
except psycopg2.Error as pe:
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
current_app.logger.error(
|
|
u"Failed to execute query (execute_async) for the server "
|
|
u"#{server_id} - {conn_id}(Query-id: {query_id}):\n"
|
|
u"Error Message:{errmsg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query.decode('utf-8'),
|
|
errmsg=errmsg,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
|
|
# Check for the asynchronous notifies.
|
|
self.check_notifies()
|
|
|
|
if self.is_disconnected(pe):
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
return False, errmsg
|
|
|
|
self.__async_cursor = cur
|
|
self.__async_query_id = query_id
|
|
|
|
return True, res
|
|
|
|
def execute_void(self, query, params=None, formatted_exception_msg=False):
|
|
"""
|
|
This function executes the given query with no result.
|
|
|
|
Args:
|
|
query: SQL query to run.
|
|
params: extra parameters to the function
|
|
formatted_exception_msg: if True then function return the
|
|
formatted exception message
|
|
"""
|
|
status, cur = self.__cursor()
|
|
self.row_count = 0
|
|
|
|
if not status:
|
|
return False, str(cur)
|
|
query_id = random.randint(1, 9999999)
|
|
|
|
current_app.logger.log(
|
|
25,
|
|
u"Execute (void) for server #{server_id} - {conn_id} (Query-id: "
|
|
u"{query_id}):\n{query}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
|
|
try:
|
|
self.__internal_blocking_execute(cur, query, params)
|
|
except psycopg2.Error as pe:
|
|
cur.close()
|
|
if not self.connected():
|
|
if self.auto_reconnect and not self.reconnecting:
|
|
return self.__attempt_execution_reconnect(
|
|
self.execute_void, query, params,
|
|
formatted_exception_msg
|
|
)
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
current_app.logger.error(
|
|
u"Failed to execute query (execute_void) for the server "
|
|
u"#{server_id} - {conn_id}(Query-id: {query_id}):\n"
|
|
u"Error Message:{errmsg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
errmsg=errmsg,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
return False, errmsg
|
|
|
|
self.row_count = cur.rowcount
|
|
|
|
return True, None
|
|
|
|
def __attempt_execution_reconnect(self, fn, *args, **kwargs):
|
|
self.reconnecting = True
|
|
setattr(g, "{0}#{1}".format(
|
|
self.manager.sid,
|
|
self.conn_id.encode('utf-8')
|
|
), None)
|
|
try:
|
|
status, res = self.connect()
|
|
if status:
|
|
if fn:
|
|
status, res = fn(*args, **kwargs)
|
|
self.reconnecting = False
|
|
return status, res
|
|
except Exception as e:
|
|
current_app.logger.exception(e)
|
|
self.reconnecting = False
|
|
|
|
current_app.warning(
|
|
"Failed to reconnect the database server "
|
|
"(#{server_id})".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id
|
|
)
|
|
)
|
|
self.reconnecting = False
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
|
|
def execute_2darray(self, query, params=None,
|
|
formatted_exception_msg=False):
|
|
status, cur = self.__cursor()
|
|
self.row_count = 0
|
|
|
|
if not status:
|
|
return False, str(cur)
|
|
|
|
query_id = random.randint(1, 9999999)
|
|
current_app.logger.log(
|
|
25,
|
|
u"Execute (2darray) for server #{server_id} - {conn_id} "
|
|
u"(Query-id: {query_id}):\n{query}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
try:
|
|
self.__internal_blocking_execute(cur, query, params)
|
|
except psycopg2.Error as pe:
|
|
cur.close()
|
|
if not self.connected():
|
|
if self.auto_reconnect and \
|
|
not self.reconnecting:
|
|
return self.__attempt_execution_reconnect(
|
|
self.execute_2darray, query, params,
|
|
formatted_exception_msg
|
|
)
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
current_app.logger.error(
|
|
u"Failed to execute query (execute_2darray) for the server "
|
|
u"#{server_id} - {conn_id} (Query-id: {query_id}):\n"
|
|
u"Error Message:{errmsg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
errmsg=errmsg,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
return False, errmsg
|
|
|
|
# Get Resultset Column Name, Type and size
|
|
columns = cur.description and [
|
|
desc.to_dict() for desc in cur.ordered_description()
|
|
] or []
|
|
|
|
rows = []
|
|
self.row_count = cur.rowcount
|
|
if cur.rowcount > 0:
|
|
for row in cur:
|
|
rows.append(row)
|
|
|
|
return True, {'columns': columns, 'rows': rows}
|
|
|
|
def execute_dict(self, query, params=None, formatted_exception_msg=False):
|
|
status, cur = self.__cursor()
|
|
self.row_count = 0
|
|
|
|
if not status:
|
|
return False, str(cur)
|
|
query_id = random.randint(1, 9999999)
|
|
current_app.logger.log(
|
|
25,
|
|
u"Execute (dict) for server #{server_id} - {conn_id} (Query-id: "
|
|
u"{query_id}):\n{query}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query=query,
|
|
query_id=query_id
|
|
)
|
|
)
|
|
try:
|
|
self.__internal_blocking_execute(cur, query, params)
|
|
except psycopg2.Error as pe:
|
|
cur.close()
|
|
if not self.connected():
|
|
if self.auto_reconnect and not self.reconnecting:
|
|
return self.__attempt_execution_reconnect(
|
|
self.execute_dict, query, params,
|
|
formatted_exception_msg
|
|
)
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
|
|
)
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
current_app.logger.error(
|
|
u"Failed to execute query (execute_dict) for the server "
|
|
u"#{server_id}- {conn_id} (Query-id: {query_id}):\n"
|
|
u"Error Message:{errmsg}".format(
|
|
server_id=self.manager.sid,
|
|
conn_id=self.conn_id,
|
|
query_id=query_id,
|
|
errmsg=errmsg
|
|
)
|
|
)
|
|
return False, errmsg
|
|
|
|
# Get Resultset Column Name, Type and size
|
|
columns = cur.description and [
|
|
desc.to_dict() for desc in cur.ordered_description()
|
|
] or []
|
|
|
|
rows = []
|
|
self.row_count = cur.rowcount
|
|
if cur.rowcount > 0:
|
|
for row in cur:
|
|
rows.append(dict(row))
|
|
|
|
return True, {'columns': columns, 'rows': rows}
|
|
|
|
def async_fetchmany_2darray(self, records=2000,
|
|
formatted_exception_msg=False):
|
|
"""
|
|
User should poll and check if status is ASYNC_OK before calling this
|
|
function
|
|
Args:
|
|
records: no of records to fetch. use -1 to fetchall.
|
|
formatted_exception_msg:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
cur = self.__async_cursor
|
|
if not cur:
|
|
return False, gettext(
|
|
"Cursor could not be found for the async connection."
|
|
)
|
|
|
|
if self.conn.isexecuting():
|
|
return False, gettext(
|
|
"Asynchronous query execution/operation underway."
|
|
)
|
|
|
|
if self.row_count > 0:
|
|
result = []
|
|
# For DDL operation, we may not have result.
|
|
#
|
|
# Because - there is not direct way to differentiate DML and
|
|
# DDL operations, we need to rely on exception to figure
|
|
# that out at the moment.
|
|
try:
|
|
if records == -1:
|
|
res = cur.fetchall()
|
|
else:
|
|
res = cur.fetchmany(records)
|
|
for row in res:
|
|
new_row = []
|
|
for col in self.column_info:
|
|
new_row.append(row[col['name']])
|
|
result.append(new_row)
|
|
except psycopg2.ProgrammingError as e:
|
|
result = None
|
|
else:
|
|
# User performed operation which dose not produce record/s as
|
|
# result.
|
|
# for eg. DDL operations.
|
|
return True, None
|
|
|
|
return True, result
|
|
|
|
def connected(self):
|
|
if self.conn:
|
|
if not self.conn.closed:
|
|
return True
|
|
self.conn = None
|
|
return False
|
|
|
|
def reset(self):
|
|
if self.conn:
|
|
if self.conn.closed:
|
|
self.conn = None
|
|
pg_conn = None
|
|
manager = self.manager
|
|
|
|
password = getattr(manager, 'password', None)
|
|
|
|
if password:
|
|
# Fetch Logged in User Details.
|
|
user = User.query.filter_by(id=current_user.id).first()
|
|
|
|
if user is None:
|
|
return False, gettext("Unauthorized request.")
|
|
|
|
password = decrypt(password, user.password).decode()
|
|
|
|
try:
|
|
pg_conn = psycopg2.connect(
|
|
host=manager.local_bind_host if manager.use_ssh_tunnel
|
|
else manager.host,
|
|
hostaddr=manager.local_bind_host if manager.use_ssh_tunnel
|
|
else manager.hostaddr,
|
|
port=manager.local_bind_port if manager.use_ssh_tunnel
|
|
else manager.port,
|
|
database=self.db,
|
|
user=manager.user,
|
|
password=password,
|
|
passfile=get_complete_file_path(manager.passfile),
|
|
sslmode=manager.ssl_mode,
|
|
sslcert=get_complete_file_path(manager.sslcert),
|
|
sslkey=get_complete_file_path(manager.sslkey),
|
|
sslrootcert=get_complete_file_path(manager.sslrootcert),
|
|
sslcrl=get_complete_file_path(manager.sslcrl),
|
|
sslcompression=True if manager.sslcompression else False,
|
|
service=manager.service,
|
|
connect_timeout=manager.connect_timeout
|
|
)
|
|
|
|
except psycopg2.Error as e:
|
|
msg = e.pgerror if e.pgerror else e.message \
|
|
if e.message else e.diag.message_detail \
|
|
if e.diag.message_detail else str(e)
|
|
|
|
current_app.logger.error(
|
|
gettext(
|
|
"""
|
|
Failed to reset the connection to the server due to following error:
|
|
{0}"""
|
|
).Format(msg)
|
|
)
|
|
return False, msg
|
|
|
|
pg_conn.notices = deque([], self.ASYNC_NOTICE_MAXLENGTH)
|
|
self.conn = pg_conn
|
|
self.__backend_pid = pg_conn.get_backend_pid()
|
|
|
|
return True, None
|
|
|
|
def transaction_status(self):
|
|
if self.conn:
|
|
return self.conn.get_transaction_status()
|
|
return None
|
|
|
|
def ping(self):
|
|
return self.execute_scalar('SELECT 1')
|
|
|
|
def _release(self):
|
|
if self.wasConnected:
|
|
if self.conn:
|
|
self.conn.close()
|
|
self.conn = None
|
|
self.password = None
|
|
self.wasConnected = False
|
|
|
|
def _wait(self, conn):
|
|
"""
|
|
This function is used for the asynchronous connection,
|
|
it will call poll method in a infinite loop till poll
|
|
returns psycopg2.extensions.POLL_OK. This is a blocking
|
|
call.
|
|
|
|
Args:
|
|
conn: connection object
|
|
"""
|
|
|
|
while 1:
|
|
state = conn.poll()
|
|
if state == psycopg2.extensions.POLL_OK:
|
|
break
|
|
elif state == psycopg2.extensions.POLL_WRITE:
|
|
select.select([], [conn.fileno()], [])
|
|
elif state == psycopg2.extensions.POLL_READ:
|
|
select.select([conn.fileno()], [], [])
|
|
else:
|
|
raise psycopg2.OperationalError(
|
|
"poll() returned %s from _wait function" % state)
|
|
|
|
def _wait_timeout(self, conn):
|
|
"""
|
|
This function is used for the asynchronous connection,
|
|
it will call poll method and return the status. If state is
|
|
psycopg2.extensions.POLL_WRITE and psycopg2.extensions.POLL_READ
|
|
function will wait for the given timeout.This is not a blocking call.
|
|
|
|
Args:
|
|
conn: connection object
|
|
time: wait time
|
|
"""
|
|
|
|
while 1:
|
|
state = conn.poll()
|
|
|
|
if state == psycopg2.extensions.POLL_OK:
|
|
return self.ASYNC_OK
|
|
elif state == psycopg2.extensions.POLL_WRITE:
|
|
# Wait for the given time and then check the return status
|
|
# If three empty lists are returned then the time-out is
|
|
# reached.
|
|
timeout_status = select.select(
|
|
[], [conn.fileno()], [], self.ASYNC_TIMEOUT
|
|
)
|
|
if timeout_status == ([], [], []):
|
|
return self.ASYNC_WRITE_TIMEOUT
|
|
elif state == psycopg2.extensions.POLL_READ:
|
|
# Wait for the given time and then check the return status
|
|
# If three empty lists are returned then the time-out is
|
|
# reached.
|
|
timeout_status = select.select(
|
|
[conn.fileno()], [], [], self.ASYNC_TIMEOUT
|
|
)
|
|
if timeout_status == ([], [], []):
|
|
return self.ASYNC_READ_TIMEOUT
|
|
else:
|
|
raise psycopg2.OperationalError(
|
|
"poll() returned %s from _wait_timeout function" % state
|
|
)
|
|
|
|
def poll(self, formatted_exception_msg=False, no_result=False):
|
|
"""
|
|
This function is a wrapper around connection's poll function.
|
|
It internally uses the _wait_timeout method to poll the
|
|
result on the connection object. In case of success it
|
|
returns the result of the query.
|
|
|
|
Args:
|
|
formatted_exception_msg: if True then function return the formatted
|
|
exception message, otherwise error string.
|
|
no_result: If True then only poll status will be returned.
|
|
"""
|
|
|
|
cur = self.__async_cursor
|
|
if not cur:
|
|
return False, gettext(
|
|
"Cursor could not be found for the async connection."
|
|
)
|
|
|
|
current_app.logger.log(
|
|
25,
|
|
"Polling result for (Query-id: {query_id})".format(
|
|
query_id=self.__async_query_id
|
|
)
|
|
)
|
|
|
|
is_error = False
|
|
try:
|
|
status = self._wait_timeout(self.conn)
|
|
except psycopg2.Error as pe:
|
|
if self.conn.closed:
|
|
raise ConnectionLost(
|
|
self.manager.sid,
|
|
self.db,
|
|
self.conn_id[5:]
|
|
)
|
|
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
|
is_error = True
|
|
|
|
if self.conn.notices and self.__notices is not None:
|
|
self.__notices.extend(self.conn.notices)
|
|
self.conn.notices.clear()
|
|
|
|
# Check for the asynchronous notifies.
|
|
self.check_notifies()
|
|
|
|
# We also need to fetch notices before we return from function in case
|
|
# of any Exception, To avoid code duplication we will return after
|
|
# fetching the notices in case of any Exception
|
|
if is_error:
|
|
return False, errmsg
|
|
|
|
result = None
|
|
self.row_count = 0
|
|
self.column_info = None
|
|
|
|
if status == self.ASYNC_OK:
|
|
|
|
# if user has cancelled the transaction then changed the status
|
|
if self.execution_aborted:
|
|
status = self.ASYNC_EXECUTION_ABORTED
|
|
self.execution_aborted = False
|
|
return status, result
|
|
|
|
# Fetch the column information
|
|
if cur.description is not None:
|
|
self.column_info = [
|
|
desc.to_dict() for desc in cur.ordered_description()
|
|
]
|
|
|
|
pos = 0
|
|
for col in self.column_info:
|
|
col['pos'] = pos
|
|
pos += 1
|
|
|
|
self.row_count = cur.rowcount
|
|
if not no_result:
|
|
if cur.rowcount > 0:
|
|
result = []
|
|
# For DDL operation, we may not have result.
|
|
#
|
|
# Because - there is not direct way to differentiate DML
|
|
# and DDL operations, we need to rely on exception to
|
|
# figure that out at the moment.
|
|
try:
|
|
for row in cur:
|
|
new_row = []
|
|
for col in self.column_info:
|
|
new_row.append(row[col['name']])
|
|
result.append(new_row)
|
|
|
|
except psycopg2.ProgrammingError:
|
|
result = None
|
|
|
|
return status, result
|
|
|
|
def status_message(self):
|
|
"""
|
|
This function will return the status message returned by the last
|
|
command executed on the server.
|
|
"""
|
|
cur = self.__async_cursor
|
|
if not cur:
|
|
return gettext(
|
|
"Cursor could not be found for the async connection."
|
|
)
|
|
|
|
current_app.logger.log(
|
|
25,
|
|
"Status message for (Query-id: {query_id})".format(
|
|
query_id=self.__async_query_id
|
|
)
|
|
)
|
|
|
|
return cur.statusmessage
|
|
|
|
def rows_affected(self):
|
|
"""
|
|
This function will return the no of rows affected by the last command
|
|
executed on the server.
|
|
"""
|
|
|
|
return self.row_count
|
|
|
|
def get_column_info(self):
|
|
"""
|
|
This function will returns list of columns for last async sql command
|
|
executed on the server.
|
|
"""
|
|
|
|
return self.column_info
|
|
|
|
def cancel_transaction(self, conn_id, did=None):
|
|
"""
|
|
This function is used to cancel the running transaction
|
|
of the given connection id and database id using
|
|
PostgreSQL's pg_cancel_backend.
|
|
|
|
Args:
|
|
conn_id: Connection id
|
|
did: Database id (optional)
|
|
"""
|
|
cancel_conn = self.manager.connection(did=did, conn_id=conn_id)
|
|
query = """SELECT pg_cancel_backend({0});""".format(
|
|
cancel_conn.__backend_pid)
|
|
|
|
status = True
|
|
msg = ''
|
|
|
|
# if backend pid is same then create a new connection
|
|
# to cancel the query and release it.
|
|
if cancel_conn.__backend_pid == self.__backend_pid:
|
|
password = getattr(self.manager, 'password', None)
|
|
if password:
|
|
# Fetch Logged in User Details.
|
|
user = User.query.filter_by(id=current_user.id).first()
|
|
if user is None:
|
|
return False, gettext("Unauthorized request.")
|
|
|
|
password = decrypt(password, user.password).decode()
|
|
|
|
try:
|
|
pg_conn = psycopg2.connect(
|
|
host=self.manager.local_bind_host if
|
|
self.manager.use_ssh_tunnel else self.manager.host,
|
|
hostaddr=self.manager.local_bind_host if
|
|
self.manager.use_ssh_tunnel else
|
|
self.manager.hostaddr,
|
|
port=self.manager.local_bind_port if
|
|
self.manager.use_ssh_tunnel else self.manager.port,
|
|
database=self.db,
|
|
user=self.manager.user,
|
|
password=password,
|
|
passfile=get_complete_file_path(self.manager.passfile),
|
|
sslmode=self.manager.ssl_mode,
|
|
sslcert=get_complete_file_path(self.manager.sslcert),
|
|
sslkey=get_complete_file_path(self.manager.sslkey),
|
|
sslrootcert=get_complete_file_path(
|
|
self.manager.sslrootcert
|
|
),
|
|
sslcrl=get_complete_file_path(self.manager.sslcrl),
|
|
sslcompression=True if self.manager.sslcompression
|
|
else False,
|
|
service=self.manager.service,
|
|
connect_timeout=self.manager.connect_timeout
|
|
)
|
|
|
|
# Get the cursor and run the query
|
|
cur = pg_conn.cursor()
|
|
cur.execute(query)
|
|
|
|
# Close the connection
|
|
pg_conn.close()
|
|
pg_conn = None
|
|
|
|
except psycopg2.Error as e:
|
|
status = False
|
|
if e.pgerror:
|
|
msg = e.pgerror
|
|
elif e.diag.message_detail:
|
|
msg = e.diag.message_detail
|
|
else:
|
|
msg = str(e)
|
|
return status, msg
|
|
else:
|
|
if self.connected():
|
|
status, msg = self.execute_void(query)
|
|
|
|
if status:
|
|
cancel_conn.execution_aborted = True
|
|
else:
|
|
status = False
|
|
msg = gettext("Not connected to the database server.")
|
|
|
|
return status, msg
|
|
|
|
def messages(self):
|
|
"""
|
|
Returns the list of the messages/notices send from the database server.
|
|
"""
|
|
resp = []
|
|
while self.__notices:
|
|
resp.append(self.__notices.pop(0))
|
|
|
|
for notify in self.__notifies:
|
|
if notify.payload is not None and notify.payload is not '':
|
|
notify_msg = gettext(
|
|
"Asynchronous notification \"{0}\" with payload \"{1}\" "
|
|
"received from server process with PID {2}\n"
|
|
).format(notify.channel, notify.payload, notify.pid)
|
|
|
|
else:
|
|
notify_msg = gettext(
|
|
"Asynchronous notification \"{0}\" received from "
|
|
"server process with PID {1}\n"
|
|
).format(notify.channel, notify.pid)
|
|
resp.append(notify_msg)
|
|
|
|
return resp
|
|
|
|
def decode_to_utf8(self, value):
|
|
"""
|
|
This method will decode values to utf-8
|
|
Args:
|
|
value: String to be decode
|
|
|
|
Returns:
|
|
Decoded string
|
|
"""
|
|
is_error = False
|
|
if hasattr(str, 'decode'):
|
|
try:
|
|
value = value.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
# Let's try with python's preferred encoding
|
|
# On Windows lc_messages mostly has environment dependent
|
|
# encoding like 'French_France.1252'
|
|
try:
|
|
import locale
|
|
pref_encoding = locale.getpreferredencoding()
|
|
value = value.decode(pref_encoding)\
|
|
.encode('utf-8')\
|
|
.decode('utf-8')
|
|
except Exception:
|
|
is_error = True
|
|
except Exception:
|
|
is_error = True
|
|
|
|
# If still not able to decode then
|
|
if is_error:
|
|
value = value.decode('ascii', 'ignore')
|
|
|
|
return value
|
|
|
|
def _formatted_exception_msg(self, exception_obj, formatted_msg):
|
|
"""
|
|
This method is used to parse the psycopg2.Error object and returns the
|
|
formatted error message if flag is set to true else return
|
|
normal error message.
|
|
|
|
Args:
|
|
exception_obj: exception object
|
|
formatted_msg: if True then function return the formatted exception
|
|
message
|
|
|
|
"""
|
|
if exception_obj.pgerror:
|
|
errmsg = exception_obj.pgerror
|
|
elif exception_obj.diag.message_detail:
|
|
errmsg = exception_obj.diag.message_detail
|
|
else:
|
|
errmsg = str(exception_obj)
|
|
# errmsg might contains encoded value, lets decode it
|
|
errmsg = self.decode_to_utf8(errmsg)
|
|
|
|
# if formatted_msg is false then return from the function
|
|
if not formatted_msg:
|
|
return errmsg
|
|
|
|
# Do not append if error starts with `ERROR:` as most pg related
|
|
# error starts with `ERROR:`
|
|
if not errmsg.startswith(u'ERROR:'):
|
|
errmsg = u'ERROR: ' + errmsg + u'\n\n'
|
|
|
|
if exception_obj.diag.severity is not None \
|
|
and exception_obj.diag.message_primary is not None:
|
|
ex_diag_message = u"{0}: {1}".format(
|
|
self.decode_to_utf8(exception_obj.diag.severity),
|
|
self.decode_to_utf8(exception_obj.diag.message_primary)
|
|
)
|
|
# If both errors are different then only append it
|
|
if errmsg and ex_diag_message and \
|
|
ex_diag_message.strip().strip('\n').lower() not in \
|
|
errmsg.strip().strip('\n').lower():
|
|
errmsg += ex_diag_message
|
|
elif exception_obj.diag.message_primary is not None:
|
|
message_primary = self.decode_to_utf8(
|
|
exception_obj.diag.message_primary
|
|
)
|
|
if message_primary.lower() not in errmsg.lower():
|
|
errmsg += message_primary
|
|
|
|
if exception_obj.diag.sqlstate is not None:
|
|
if not errmsg.endswith('\n'):
|
|
errmsg += '\n'
|
|
errmsg += gettext('SQL state: ')
|
|
errmsg += self.decode_to_utf8(exception_obj.diag.sqlstate)
|
|
|
|
if exception_obj.diag.message_detail is not None:
|
|
if 'Detail:'.lower() not in errmsg.lower():
|
|
if not errmsg.endswith('\n'):
|
|
errmsg += '\n'
|
|
errmsg += gettext('Detail: ')
|
|
errmsg += self.decode_to_utf8(
|
|
exception_obj.diag.message_detail
|
|
)
|
|
|
|
if exception_obj.diag.message_hint is not None:
|
|
if 'Hint:'.lower() not in errmsg.lower():
|
|
if not errmsg.endswith('\n'):
|
|
errmsg += '\n'
|
|
errmsg += gettext('Hint: ')
|
|
errmsg += self.decode_to_utf8(exception_obj.diag.message_hint)
|
|
|
|
if exception_obj.diag.statement_position is not None:
|
|
if 'Character:'.lower() not in errmsg.lower():
|
|
if not errmsg.endswith('\n'):
|
|
errmsg += '\n'
|
|
errmsg += gettext('Character: ')
|
|
errmsg += self.decode_to_utf8(
|
|
exception_obj.diag.statement_position
|
|
)
|
|
|
|
if exception_obj.diag.context is not None:
|
|
if 'Context:'.lower() not in errmsg.lower():
|
|
if not errmsg.endswith('\n'):
|
|
errmsg += '\n'
|
|
errmsg += gettext('Context: ')
|
|
errmsg += self.decode_to_utf8(exception_obj.diag.context)
|
|
|
|
return errmsg
|
|
|
|
#####
|
|
# As per issue reported on pgsycopg2 github repository link is shared below
|
|
# conn.closed is not reliable enough to identify the disconnection from the
|
|
# database server for some unknown reasons.
|
|
#
|
|
# (https://github.com/psycopg/psycopg2/issues/263)
|
|
#
|
|
# In order to resolve the issue, sqlalchamey follows the below logic to
|
|
# identify the disconnection. It relies on exception message to identify
|
|
# the error.
|
|
#
|
|
# Reference (MIT license):
|
|
# https://github.com/zzzeek/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/psycopg2.py
|
|
#
|
|
def is_disconnected(self, err):
|
|
if not self.conn.closed:
|
|
# checks based on strings. in the case that .closed
|
|
# didn't cut it, fall back onto these.
|
|
str_e = str(err).partition("\n")[0]
|
|
for msg in [
|
|
# these error messages from libpq: interfaces/libpq/fe-misc.c
|
|
# and interfaces/libpq/fe-secure.c.
|
|
'terminating connection',
|
|
'closed the connection',
|
|
'connection not open',
|
|
'could not receive data from server',
|
|
'could not send data to server',
|
|
# psycopg2 client errors, psycopg2/conenction.h,
|
|
# psycopg2/cursor.h
|
|
'connection already closed',
|
|
'cursor already closed',
|
|
# not sure where this path is originally from, it may
|
|
# be obsolete. It really says "losed", not "closed".
|
|
'losed the connection unexpectedly',
|
|
# these can occur in newer SSL
|
|
'connection has been closed unexpectedly',
|
|
'SSL SYSCALL error: Bad file descriptor',
|
|
'SSL SYSCALL error: EOF detected',
|
|
]:
|
|
idx = str_e.find(msg)
|
|
if idx >= 0 and '"' not in str_e[:idx]:
|
|
return True
|
|
|
|
return False
|
|
return True
|
|
|
|
def check_notifies(self, required_polling=False):
|
|
"""
|
|
Check for the notify messages by polling the connection or after
|
|
execute is there in notifies.
|
|
"""
|
|
if self.conn and required_polling:
|
|
self.conn.poll()
|
|
|
|
if self.conn and hasattr(self.conn, 'notifies') and \
|
|
len(self.conn.notifies) > 0:
|
|
self.__notifies.extend(self.conn.notifies)
|
|
self.conn.notifies = []
|
|
else:
|
|
self.__notifies = []
|
|
|
|
def get_notifies(self):
|
|
"""
|
|
This function will returns list of notifies received from database
|
|
server.
|
|
"""
|
|
notifies = None
|
|
# Convert list of Notify objects into list of Dict.
|
|
if self.__notifies is not None and len(self.__notifies) > 0:
|
|
notifies = [{'recorded_time': str(datetime.datetime.now()),
|
|
'channel': notify.channel,
|
|
'payload': notify.payload,
|
|
'pid': notify.pid
|
|
} for notify in self.__notifies
|
|
]
|
|
return notifies
|
|
|
|
def pq_encrypt_password_conn(self, password, user):
|
|
"""
|
|
This function will return the encrypted password for database server
|
|
greater than or equal to 10
|
|
:param password: password to be encrypted
|
|
:param user: user of the database server
|
|
:return:
|
|
"""
|
|
enc_password = None
|
|
if psycopg2.__libpq_version__ >= 100000 and \
|
|
hasattr(psycopg2.extensions, 'encrypt_password'):
|
|
if self.connected():
|
|
status, enc_algorithm = \
|
|
self.execute_scalar("SHOW password_encryption")
|
|
if status:
|
|
enc_password = psycopg2.extensions.encrypt_password(
|
|
password=password, user=user, scope=self.conn,
|
|
algorithm=enc_algorithm
|
|
)
|
|
elif psycopg2.__libpq_version__ < 100000:
|
|
current_app.logger.warning(
|
|
u"To encrypt passwords the required libpq version is "
|
|
u"greater than or equal to 100000. Current libpq version "
|
|
u"is {curr_ver}".format(
|
|
curr_ver=psycopg2.__libpq_version__
|
|
)
|
|
)
|
|
elif not hasattr(psycopg2.extensions, 'encrypt_password'):
|
|
current_app.logger.warning(
|
|
u"The psycopg2.extensions module does not have the"
|
|
u"'encrypt_password' method."
|
|
)
|
|
|
|
return enc_password
|