diff --git a/web/pgadmin/utils/driver/abstract.py b/web/pgadmin/utils/driver/abstract.py index d6eb5f94a..81cdcb17f 100644 --- a/web/pgadmin/utils/driver/abstract.py +++ b/web/pgadmin/utils/driver/abstract.py @@ -7,11 +7,14 @@ # ########################################################################## +"""Implement the Base class for Driver and Connection""" + from abc import ABCMeta, abstractmethod, abstractproperty from flask import session from .registry import DriverRegistry import six + @six.add_metaclass(DriverRegistry) class BaseDriver(object): """ @@ -59,6 +62,7 @@ class BaseDriver(object): def gc(self): pass + @six.add_metaclass(ABCMeta) class BaseConnection(object): """ @@ -67,7 +71,7 @@ class BaseConnection(object): It is a base class for database connection. A different connection drive must implement this to expose abstract methods for this server. - General idea is to create a wrapper around the actaul driver + General idea is to create a wrapper around the actual driver implementation. It will be instantiated by the driver factory basically. And, they should not be instantiated directly. @@ -82,9 +86,15 @@ class BaseConnection(object): - Implement this method to execute the given query and returns single datum result. + * execute_async(query, params) + - Implement this method to execute the given query asynchronously and returns result. + + * execute_void(query, params) + - Implement this method to execute the given query with no result. + * execute_2darray(query, params) - Implement this method to execute the given query and returns the result - as a 2 dimentional array. + as a 2 dimensional array. * execute_dict(query, params) - Implement this method to execute the given query and returns the result @@ -113,8 +123,25 @@ class BaseConnection(object): NOTE: Please use BaseDriver.release_connection(...) for releasing the connection object for better memory management, and connection pool management. + + * _wait(conn) + - Implement this method to wait for asynchronous connection. This is a blocking call. + + * _wait_timeout(conn, time) + - Implement this method to wait for asynchronous connection with timeout. This is a non blocking call. + + * poll() + - Implement this method to poll the data of query running on asynchronous connection. + + * cancel_transaction(conn_id, did=None) + - Implement this method to cancel the running transaction. """ + ASYNC_OK = 1 + ASYNC_READ_TIMEOUT = 2 + ASYNC_WRITE_TIMEOUT = 3 + ASYNC_NOT_CONNECTED = 4 + @abstractmethod def connect(self, **kwargs): pass @@ -123,6 +150,14 @@ class BaseConnection(object): def execute_scalar(self, query, params=None): pass + @abstractmethod + def execute_async(self, query, params=None): + pass + + @abstractmethod + def execute_void(self, query, params=None): + pass + @abstractmethod def execute_2darray(self, query, params=None): pass @@ -150,3 +185,19 @@ class BaseConnection(object): @abstractmethod def _release(self): pass + + @abstractmethod + def _wait(self, conn): + pass + + @abstractmethod + def _wait_timeout(self, conn, time): + pass + + @abstractmethod + def poll(self): + pass + + @abstractmethod + def cancel_transaction(self, conn_id, did=None): + pass diff --git a/web/pgadmin/utils/driver/psycopg2/__init__.py b/web/pgadmin/utils/driver/psycopg2/__init__.py index 3badf3fb8..a2790a8f8 100644 --- a/web/pgadmin/utils/driver/psycopg2/__init__.py +++ b/web/pgadmin/utils/driver/psycopg2/__init__.py @@ -7,6 +7,8 @@ # ########################################################################## +"""Implementation of Connection, ServerManager and Driver classes""" + from datetime import datetime import psycopg2 @@ -21,12 +23,15 @@ from ..abstract import BaseDriver, BaseConnection from pgadmin.settings.settings_model import Server, User from pgadmin.utils.crypto import decrypt import random +import select from .keywords import ScanKeyword _ = gettext +ASYNC_WAIT_TIMEOUT = 0.1 # in seconds or 100 milliseconds + class Connection(BaseConnection): """ @@ -43,8 +48,14 @@ class Connection(BaseConnection): * execute_scalar(query, params) - Execute the given query and returns single datum result + * execute_async(query, params) + - Execute the given query asynchronously and returns result. + + * execute_void(query, params) + - Execute the given query with no result. + * execute_2darray(query, params) - - Execute the given query and returns the result as a 2 dimentional + - Execute the given query and returns the result as a 2 dimensional array. * execute_dict(query, params) @@ -59,15 +70,28 @@ class Connection(BaseConnection): - Reconnect the database server (if possible) * transaction_status() - - Trasaction Status + - Transaction Status * ping() - Ping the server. * _release() - Release the connection object of psycopg2 + + * _wait(conn) + - This method is used to wait for asynchronous connection. This is a blocking call. + + * _wait_timeout(conn, time) + - This method is used to wait for asynchronous connection with timeout. This is a non blocking call. + + * poll() + - This method is used to poll the data of query running on asynchronous connection. + + * cancel_transaction(conn_id, did=None) + - This method is used to cancel the transaction for the + specified connection id and database id. """ - def __init__(self, manager, conn_id, db, auto_reconnect=True): + def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0): assert(manager is not None) assert(conn_id is not None) @@ -76,6 +100,10 @@ class Connection(BaseConnection): self.db = db if db is not None else manager.db self.conn = None self.auto_reconnect = auto_reconnect + self.async = async + self.__async_cursor = None + self.__async_query_id = None + self.__backend_pid = None super(Connection, self).__init__() @@ -123,9 +151,15 @@ class Connection(BaseConnection): port=mgr.port, database=self.db, user=mgr.user, - password=password + password=password, + async=self.async ) + # If connection is asynchronous then we will have to wait + # until the connection is ready to use. + if self.async == 1: + self._wait(pg_conn) + except psycopg2.Error as e: if e.pgerror: msg = e.pgerror @@ -144,10 +178,15 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id} return False, msg - pg_conn.autocommit = True self.conn = pg_conn + self.__backend_pid = pg_conn.get_backend_pid() + + # autocommit and client encoding not worked with asynchronous connection + # By default asynchronous connection runs in autocommit mode + if self.async == 0: + self.conn.autocommit = True + self.conn.set_client_encoding("UNICODE") - self.conn.set_client_encoding("UNICODE") status, res = self.execute_scalar(""" SET DateStyle=ISO; SET client_min_messages=notice; @@ -248,7 +287,7 @@ WHERE errmsg = "" current_app.logger.warning(""" -Connection te database server (#{server_id}) for the connection - '{conn_id}' has been lost. +Connection to database server (#{server_id}) for the connection - '{conn_id}' has been lost. """.format( server_id=self.manager.sid, conn_id=self.conn_id @@ -284,7 +323,7 @@ server#{1}:{2}: if self.auto_reconnect: current_app.logger.debug(""" -Attempting to reconnet to the database server (#{server_id}) for the connection - '{conn_id}'. +Attempting to reconnect to the database server (#{server_id}) for the connection - '{conn_id}'. """.format( server_id=self.manager.sid, conn_id=self.conn_id @@ -308,6 +347,22 @@ Attempt to reconnect it failed with the below error: return True, cur + def __internal_blocking_execute(self, cur, query, params): + """ + This function executes the query using cursor's execute function, + but in case of asynchronous connection we need to wait for the + transaction to be completed. If self.async is 1 then it is a + blocking call. + + Args: + cur: Cursor object + query: SQL query to run. + params: Extra parameters + """ + cur.execute(query, params) + if self.async == 1: + self._wait(cur.connection) + def execute_scalar(self, query, params=None): status, cur = self.__cursor() @@ -324,7 +379,7 @@ Attempt to reconnect it failed with the below error: ) ) try: - cur.execute(query, params) + self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() errmsg = str(pe) @@ -346,6 +401,97 @@ Attempt to reconnect it failed with the below error: return True, None + def execute_async(self, query, params=None): + """ + This function executes the given query asynchronously and returns result. + + Args: + query: SQL query to run. + params: extra parameters to the function + """ + status, cur = self.__cursor() + + if not status: + return False, str(cur) + query_id = random.randint(1, 9999999) + + current_app.logger.log(25, """ +Execute (async) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{query} +""".format( + server_id=self.manager.sid, + conn_id=self.conn_id, + query=query, + query_id=query_id + ) + ) + + try: + cur.execute(query, params) + res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT) + except psycopg2.Error as pe: + errmsg = str(pe) + current_app.logger.error(""" +Failed to execute query (execute_async) for the server #{server_id} - {conn_id} +(Query-id: {query_id}):\nError Message:{errmsg} +""".format( + server_id=self.manager.sid, + conn_id=self.conn_id, + query=query, + errmsg=errmsg, + query_id=query_id + ) + ) + return False, errmsg + + self.__async_cursor = cur + self.__async_query_id = query_id + + return True, res + + def execute_void(self, query, params=None): + """ + This function executes the given query with no result. + + Args: + query: SQL query to run. + params: extra parameters to the function + """ + status, cur = self.__cursor() + + if not status: + return False, str(cur) + query_id = random.randint(1, 9999999) + + current_app.logger.log(25, """ +Execute (void) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{query} +""".format( + server_id=self.manager.sid, + conn_id=self.conn_id, + query=query, + query_id=query_id + ) + ) + + try: + self.__internal_blocking_execute(cur, query, params) + except psycopg2.Error as pe: + cur.close() + errmsg = str(pe) + current_app.logger.error(""" +Failed to execute query (execute_void) for the server #{server_id} - {conn_id} +(Query-id: {query_id}):\nError Message:{errmsg} +""".format( + server_id=self.manager.sid, + conn_id=self.conn_id, + query=query, + errmsg=errmsg, + query_id=query_id + ) + ) + return False, errmsg + + return True, None + def execute_2darray(self, query, params=None): status, cur = self.__cursor() @@ -362,7 +508,7 @@ Attempt to reconnect it failed with the below error: ) ) try: - cur.execute(query, params) + self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() errmsg = str(pe) @@ -405,7 +551,7 @@ Attempt to reconnect it failed with the below error: ) ) try: - cur.execute(query, params) + self.__internal_blocking_execute(cur, query, params) except psycopg2.Error as pe: cur.close() errmsg = str(pe) @@ -455,7 +601,7 @@ Attempt to reconnect it failed with the below error: if user is None: return False, gettext("Unauthorized Request.") - password = decrypt(password, user.password) + password = decrypt(password, user.password).decode() try: pg_conn = psycopg2.connect( @@ -481,6 +627,7 @@ Failed to reset the connection of the server due to following error: return False, msg self.conn = pg_conn + self.__backend_pid = pg_conn.get_backend_pid() return True, None @@ -497,6 +644,143 @@ Failed to reset the connection of the server due to following error: self.conn.close() self.conn = None + def _wait(self, conn): + """ + This function is used for the asynchronous connection, + it will call poll method in a infinite loop till poll + returns psycopg2.extensions.POLL_OK. This is a blocking + call. + + Args: + conn: connection object + """ + + while 1: + state = conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s from _wait function" % state) + + def _wait_timeout(self, conn, time): + """ + This function is used for the asynchronous connection, + it will call poll method and return the status. If state is + psycopg2.extensions.POLL_WRITE and psycopg2.extensions.POLL_READ + function will wait for the given timeout.This is not a blocking call. + + Args: + conn: connection object + time: wait time + """ + + state = conn.poll() + if state == psycopg2.extensions.POLL_OK: + return self.ASYNC_OK + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn.fileno()], [], time) + return self.ASYNC_WRITE_TIMEOUT + elif state == psycopg2.extensions.POLL_READ: + select.select([conn.fileno()], [], [], time) + return self.ASYNC_READ_TIMEOUT + else: + raise psycopg2.OperationalError("poll() returned %s from _wait_timeout function" % state) + + def poll(self): + """ + This function is a wrapper around connection's poll function. + It internally uses the _wait_timeout method to poll the + result on the connection object. In case of success it + returns the result of the query. + """ + + cur = self.__async_cursor + if not cur: + return False, gettext("Cursor could not found for the aysnc connection."), None + + current_app.logger.log(25, """ +Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_id)) + + status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT) + if status == self.ASYNC_OK: + if cur.rowcount > 0: + # Fetch the column information + colinfo = [desc for desc in cur.description] + result = [] + # Fetch the data rows. + for row in cur: + result.append(dict(row)) + self.__async_cursor = None + return status, result, colinfo + return status, None, None + + def cancel_transaction(self, conn_id, did=None): + """ + This function is used to cancel the running transaction + of the given connection id and database id using + PostgreSQL's pg_cancel_backend. + + Args: + conn_id: Connection id + did: Database id (optional) + """ + cancel_conn = self.manager.connection(did=did, conn_id=conn_id) + query = """SELECT pg_cancel_backend({0});""".format(cancel_conn.__backend_pid) + + status = True + msg = '' + + # if backend pid is same then create a new connection + # to cancel the query and release it. + if cancel_conn.__backend_pid == self.__backend_pid: + password = getattr(self.manager, 'password', None) + if password: + # Fetch Logged in User Details. + user = User.query.filter_by(id=current_user.id).first() + if user is None: + return False, gettext("Unauthorized Request.") + + password = decrypt(password, user.password).decode() + + try: + pg_conn = psycopg2.connect( + host=self.manager.host, + port=self.manager.port, + database=self.db, + user=self.manager.user, + password=password + ) + + # Get the cursor and run the query + cur = pg_conn.cursor() + cur.execute(query) + + # Close the connection + pg_conn.close() + pg_conn = None + + except psycopg2.Error as e: + status = False + if e.pgerror: + msg = e.pgerror + elif e.diag.message_detail: + msg = e.diag.message_detail + else: + msg = str(e) + return status, msg + else: + if self.connected(): + status, msg = self.execute_void(query) + else: + status = False + msg = gettext("Not connected to the database server.") + + return status, msg + class ServerManager(object): """ @@ -588,7 +872,7 @@ WHERE db.oid = {0}""".format(did)) if did not in self.db_info: raise Exception(gettext( - "Coudn't find the database!" + "Couldn't find the database!" )) if database is None: @@ -602,8 +886,9 @@ WHERE db.oid = {0}""".format(did)) if my_id in self.connections: return self.connections[my_id] else: + async = 1 if conn_id is not None else 0 self.connections[my_id] = Connection( - self, my_id, database, auto_reconnect + self, my_id, database, auto_reconnect, async ) return self.connections[my_id]