Add support for asynchronous connections to the database server.

This commit is contained in:
Akshay Joshi
2016-03-02 13:29:15 +00:00
committed by Dave Page
parent b41066a6b6
commit e138ec53b6
2 changed files with 352 additions and 16 deletions

View File

@@ -7,11 +7,14 @@
# #
########################################################################## ##########################################################################
"""Implement the Base class for Driver and Connection"""
from abc import ABCMeta, abstractmethod, abstractproperty from abc import ABCMeta, abstractmethod, abstractproperty
from flask import session from flask import session
from .registry import DriverRegistry from .registry import DriverRegistry
import six import six
@six.add_metaclass(DriverRegistry) @six.add_metaclass(DriverRegistry)
class BaseDriver(object): class BaseDriver(object):
""" """
@@ -59,6 +62,7 @@ class BaseDriver(object):
def gc(self): def gc(self):
pass pass
@six.add_metaclass(ABCMeta) @six.add_metaclass(ABCMeta)
class BaseConnection(object): class BaseConnection(object):
""" """
@@ -67,7 +71,7 @@ class BaseConnection(object):
It is a base class for database connection. A different connection It is a base class for database connection. A different connection
drive must implement this to expose abstract methods for this server. drive must implement this to expose abstract methods for this server.
General idea is to create a wrapper around the actaul driver General idea is to create a wrapper around the actual driver
implementation. It will be instantiated by the driver factory implementation. It will be instantiated by the driver factory
basically. And, they should not be instantiated directly. basically. And, they should not be instantiated directly.
@@ -82,9 +86,15 @@ class BaseConnection(object):
- Implement this method to execute the given query and returns single - Implement this method to execute the given query and returns single
datum result. datum result.
* execute_async(query, params)
- Implement this method to execute the given query asynchronously and returns result.
* execute_void(query, params)
- Implement this method to execute the given query with no result.
* execute_2darray(query, params) * execute_2darray(query, params)
- Implement this method to execute the given query and returns the result - Implement this method to execute the given query and returns the result
as a 2 dimentional array. as a 2 dimensional array.
* execute_dict(query, params) * execute_dict(query, params)
- Implement this method to execute the given query and returns the result - Implement this method to execute the given query and returns the result
@@ -113,8 +123,25 @@ class BaseConnection(object):
NOTE: Please use BaseDriver.release_connection(...) for releasing the NOTE: Please use BaseDriver.release_connection(...) for releasing the
connection object for better memory management, and connection pool connection object for better memory management, and connection pool
management. management.
* _wait(conn)
- Implement this method to wait for asynchronous connection. This is a blocking call.
* _wait_timeout(conn, time)
- Implement this method to wait for asynchronous connection with timeout. This is a non blocking call.
* poll()
- Implement this method to poll the data of query running on asynchronous connection.
* cancel_transaction(conn_id, did=None)
- Implement this method to cancel the running transaction.
""" """
ASYNC_OK = 1
ASYNC_READ_TIMEOUT = 2
ASYNC_WRITE_TIMEOUT = 3
ASYNC_NOT_CONNECTED = 4
@abstractmethod @abstractmethod
def connect(self, **kwargs): def connect(self, **kwargs):
pass pass
@@ -123,6 +150,14 @@ class BaseConnection(object):
def execute_scalar(self, query, params=None): def execute_scalar(self, query, params=None):
pass pass
@abstractmethod
def execute_async(self, query, params=None):
pass
@abstractmethod
def execute_void(self, query, params=None):
pass
@abstractmethod @abstractmethod
def execute_2darray(self, query, params=None): def execute_2darray(self, query, params=None):
pass pass
@@ -150,3 +185,19 @@ class BaseConnection(object):
@abstractmethod @abstractmethod
def _release(self): def _release(self):
pass pass
@abstractmethod
def _wait(self, conn):
pass
@abstractmethod
def _wait_timeout(self, conn, time):
pass
@abstractmethod
def poll(self):
pass
@abstractmethod
def cancel_transaction(self, conn_id, did=None):
pass

View File

@@ -7,6 +7,8 @@
# #
########################################################################## ##########################################################################
"""Implementation of Connection, ServerManager and Driver classes"""
from datetime import datetime from datetime import datetime
import psycopg2 import psycopg2
@@ -21,12 +23,15 @@ from ..abstract import BaseDriver, BaseConnection
from pgadmin.settings.settings_model import Server, User from pgadmin.settings.settings_model import Server, User
from pgadmin.utils.crypto import decrypt from pgadmin.utils.crypto import decrypt
import random import random
import select
from .keywords import ScanKeyword from .keywords import ScanKeyword
_ = gettext _ = gettext
ASYNC_WAIT_TIMEOUT = 0.1 # in seconds or 100 milliseconds
class Connection(BaseConnection): class Connection(BaseConnection):
""" """
@@ -43,8 +48,14 @@ class Connection(BaseConnection):
* execute_scalar(query, params) * execute_scalar(query, params)
- Execute the given query and returns single datum result - Execute the given query and returns single datum result
* execute_async(query, params)
- Execute the given query asynchronously and returns result.
* execute_void(query, params)
- Execute the given query with no result.
* execute_2darray(query, params) * execute_2darray(query, params)
- Execute the given query and returns the result as a 2 dimentional - Execute the given query and returns the result as a 2 dimensional
array. array.
* execute_dict(query, params) * execute_dict(query, params)
@@ -59,15 +70,28 @@ class Connection(BaseConnection):
- Reconnect the database server (if possible) - Reconnect the database server (if possible)
* transaction_status() * transaction_status()
- Trasaction Status - Transaction Status
* ping() * ping()
- Ping the server. - Ping the server.
* _release() * _release()
- Release the connection object of psycopg2 - Release the connection object of psycopg2
* _wait(conn)
- This method is used to wait for asynchronous connection. This is a blocking call.
* _wait_timeout(conn, time)
- This method is used to wait for asynchronous connection with timeout. This is a non blocking call.
* poll()
- This method is used to poll the data of query running on asynchronous connection.
* cancel_transaction(conn_id, did=None)
- This method is used to cancel the transaction for the
specified connection id and database id.
""" """
def __init__(self, manager, conn_id, db, auto_reconnect=True): def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0):
assert(manager is not None) assert(manager is not None)
assert(conn_id is not None) assert(conn_id is not None)
@@ -76,6 +100,10 @@ class Connection(BaseConnection):
self.db = db if db is not None else manager.db self.db = db if db is not None else manager.db
self.conn = None self.conn = None
self.auto_reconnect = auto_reconnect self.auto_reconnect = auto_reconnect
self.async = async
self.__async_cursor = None
self.__async_query_id = None
self.__backend_pid = None
super(Connection, self).__init__() super(Connection, self).__init__()
@@ -123,9 +151,15 @@ class Connection(BaseConnection):
port=mgr.port, port=mgr.port,
database=self.db, database=self.db,
user=mgr.user, user=mgr.user,
password=password password=password,
async=self.async
) )
# If connection is asynchronous then we will have to wait
# until the connection is ready to use.
if self.async == 1:
self._wait(pg_conn)
except psycopg2.Error as e: except psycopg2.Error as e:
if e.pgerror: if e.pgerror:
msg = e.pgerror msg = e.pgerror
@@ -144,10 +178,15 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
return False, msg return False, msg
pg_conn.autocommit = True
self.conn = pg_conn self.conn = pg_conn
self.__backend_pid = pg_conn.get_backend_pid()
# autocommit and client encoding not worked with asynchronous connection
# By default asynchronous connection runs in autocommit mode
if self.async == 0:
self.conn.autocommit = True
self.conn.set_client_encoding("UNICODE")
self.conn.set_client_encoding("UNICODE")
status, res = self.execute_scalar(""" status, res = self.execute_scalar("""
SET DateStyle=ISO; SET DateStyle=ISO;
SET client_min_messages=notice; SET client_min_messages=notice;
@@ -248,7 +287,7 @@ WHERE
errmsg = "" errmsg = ""
current_app.logger.warning(""" current_app.logger.warning("""
Connection te database server (#{server_id}) for the connection - '{conn_id}' has been lost. Connection to database server (#{server_id}) for the connection - '{conn_id}' has been lost.
""".format( """.format(
server_id=self.manager.sid, server_id=self.manager.sid,
conn_id=self.conn_id conn_id=self.conn_id
@@ -284,7 +323,7 @@ server#{1}:{2}:
if self.auto_reconnect: if self.auto_reconnect:
current_app.logger.debug(""" current_app.logger.debug("""
Attempting to reconnet to the database server (#{server_id}) for the connection - '{conn_id}'. Attempting to reconnect to the database server (#{server_id}) for the connection - '{conn_id}'.
""".format( """.format(
server_id=self.manager.sid, server_id=self.manager.sid,
conn_id=self.conn_id conn_id=self.conn_id
@@ -308,6 +347,22 @@ Attempt to reconnect it failed with the below error:
return True, cur return True, cur
def __internal_blocking_execute(self, cur, query, params):
"""
This function executes the query using cursor's execute function,
but in case of asynchronous connection we need to wait for the
transaction to be completed. If self.async is 1 then it is a
blocking call.
Args:
cur: Cursor object
query: SQL query to run.
params: Extra parameters
"""
cur.execute(query, params)
if self.async == 1:
self._wait(cur.connection)
def execute_scalar(self, query, params=None): def execute_scalar(self, query, params=None):
status, cur = self.__cursor() status, cur = self.__cursor()
@@ -324,7 +379,7 @@ Attempt to reconnect it failed with the below error:
) )
) )
try: try:
cur.execute(query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = str(pe)
@@ -346,6 +401,97 @@ Attempt to reconnect it failed with the below error:
return True, None return True, None
def execute_async(self, query, params=None):
"""
This function executes the given query asynchronously and returns result.
Args:
query: SQL query to run.
params: extra parameters to the function
"""
status, cur = self.__cursor()
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
current_app.logger.log(25, """
Execute (async) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{query}
""".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=query_id
)
)
try:
cur.execute(query, params)
res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT)
except psycopg2.Error as pe:
errmsg = str(pe)
current_app.logger.error("""
Failed to execute query (execute_async) for the server #{server_id} - {conn_id}
(Query-id: {query_id}):\nError Message:{errmsg}
""".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
errmsg=errmsg,
query_id=query_id
)
)
return False, errmsg
self.__async_cursor = cur
self.__async_query_id = query_id
return True, res
def execute_void(self, query, params=None):
"""
This function executes the given query with no result.
Args:
query: SQL query to run.
params: extra parameters to the function
"""
status, cur = self.__cursor()
if not status:
return False, str(cur)
query_id = random.randint(1, 9999999)
current_app.logger.log(25, """
Execute (void) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{query}
""".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
query_id=query_id
)
)
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe:
cur.close()
errmsg = str(pe)
current_app.logger.error("""
Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
(Query-id: {query_id}):\nError Message:{errmsg}
""".format(
server_id=self.manager.sid,
conn_id=self.conn_id,
query=query,
errmsg=errmsg,
query_id=query_id
)
)
return False, errmsg
return True, None
def execute_2darray(self, query, params=None): def execute_2darray(self, query, params=None):
status, cur = self.__cursor() status, cur = self.__cursor()
@@ -362,7 +508,7 @@ Attempt to reconnect it failed with the below error:
) )
) )
try: try:
cur.execute(query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = str(pe)
@@ -405,7 +551,7 @@ Attempt to reconnect it failed with the below error:
) )
) )
try: try:
cur.execute(query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = str(pe)
@@ -455,7 +601,7 @@ Attempt to reconnect it failed with the below error:
if user is None: if user is None:
return False, gettext("Unauthorized Request.") return False, gettext("Unauthorized Request.")
password = decrypt(password, user.password) password = decrypt(password, user.password).decode()
try: try:
pg_conn = psycopg2.connect( pg_conn = psycopg2.connect(
@@ -481,6 +627,7 @@ Failed to reset the connection of the server due to following error:
return False, msg return False, msg
self.conn = pg_conn self.conn = pg_conn
self.__backend_pid = pg_conn.get_backend_pid()
return True, None return True, None
@@ -497,6 +644,143 @@ Failed to reset the connection of the server due to following error:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
def _wait(self, conn):
"""
This function is used for the asynchronous connection,
it will call poll method in a infinite loop till poll
returns psycopg2.extensions.POLL_OK. This is a blocking
call.
Args:
conn: connection object
"""
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s from _wait function" % state)
def _wait_timeout(self, conn, time):
"""
This function is used for the asynchronous connection,
it will call poll method and return the status. If state is
psycopg2.extensions.POLL_WRITE and psycopg2.extensions.POLL_READ
function will wait for the given timeout.This is not a blocking call.
Args:
conn: connection object
time: wait time
"""
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
return self.ASYNC_OK
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [], time)
return self.ASYNC_WRITE_TIMEOUT
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [], time)
return self.ASYNC_READ_TIMEOUT
else:
raise psycopg2.OperationalError("poll() returned %s from _wait_timeout function" % state)
def poll(self):
"""
This function is a wrapper around connection's poll function.
It internally uses the _wait_timeout method to poll the
result on the connection object. In case of success it
returns the result of the query.
"""
cur = self.__async_cursor
if not cur:
return False, gettext("Cursor could not found for the aysnc connection."), None
current_app.logger.log(25, """
Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_id))
status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT)
if status == self.ASYNC_OK:
if cur.rowcount > 0:
# Fetch the column information
colinfo = [desc for desc in cur.description]
result = []
# Fetch the data rows.
for row in cur:
result.append(dict(row))
self.__async_cursor = None
return status, result, colinfo
return status, None, None
def cancel_transaction(self, conn_id, did=None):
"""
This function is used to cancel the running transaction
of the given connection id and database id using
PostgreSQL's pg_cancel_backend.
Args:
conn_id: Connection id
did: Database id (optional)
"""
cancel_conn = self.manager.connection(did=did, conn_id=conn_id)
query = """SELECT pg_cancel_backend({0});""".format(cancel_conn.__backend_pid)
status = True
msg = ''
# if backend pid is same then create a new connection
# to cancel the query and release it.
if cancel_conn.__backend_pid == self.__backend_pid:
password = getattr(self.manager, 'password', None)
if password:
# Fetch Logged in User Details.
user = User.query.filter_by(id=current_user.id).first()
if user is None:
return False, gettext("Unauthorized Request.")
password = decrypt(password, user.password).decode()
try:
pg_conn = psycopg2.connect(
host=self.manager.host,
port=self.manager.port,
database=self.db,
user=self.manager.user,
password=password
)
# Get the cursor and run the query
cur = pg_conn.cursor()
cur.execute(query)
# Close the connection
pg_conn.close()
pg_conn = None
except psycopg2.Error as e:
status = False
if e.pgerror:
msg = e.pgerror
elif e.diag.message_detail:
msg = e.diag.message_detail
else:
msg = str(e)
return status, msg
else:
if self.connected():
status, msg = self.execute_void(query)
else:
status = False
msg = gettext("Not connected to the database server.")
return status, msg
class ServerManager(object): class ServerManager(object):
""" """
@@ -588,7 +872,7 @@ WHERE db.oid = {0}""".format(did))
if did not in self.db_info: if did not in self.db_info:
raise Exception(gettext( raise Exception(gettext(
"Coudn't find the database!" "Couldn't find the database!"
)) ))
if database is None: if database is None:
@@ -602,8 +886,9 @@ WHERE db.oid = {0}""".format(did))
if my_id in self.connections: if my_id in self.connections:
return self.connections[my_id] return self.connections[my_id]
else: else:
async = 1 if conn_id is not None else 0
self.connections[my_id] = Connection( self.connections[my_id] = Connection(
self, my_id, database, auto_reconnect self, my_id, database, auto_reconnect, async
) )
return self.connections[my_id] return self.connections[my_id]