Improvise the psycopg2 wrapped driver for PostgreSQL to show formatted

message on demand.

Also - resolved an issue identifying the aborted transaction while using
the asychronous connection.
This commit is contained in:
Akshay Joshi
2016-04-04 12:28:52 +05:30
committed by Ashesh Vashi
parent 43d532321b
commit 5331075ab0
2 changed files with 164 additions and 46 deletions

View File

@@ -81,21 +81,21 @@ class BaseConnection(object):
- Define this method to connect the server using that particular driver - Define this method to connect the server using that particular driver
implementation. implementation.
* execute_scalar(query, params) * execute_scalar(query, params, formatted_exception_msg)
- Implement this method to execute the given query and returns single - Implement this method to execute the given query and returns single
datum result. datum result.
* execute_async(query, params) * execute_async(query, params, formatted_exception_msg)
- Implement this method to execute the given query asynchronously and returns result. - Implement this method to execute the given query asynchronously and returns result.
* execute_void(query, params) * execute_void(query, params, formatted_exception_msg)
- Implement this method to execute the given query with no result. - Implement this method to execute the given query with no result.
* execute_2darray(query, params) * execute_2darray(query, params, formatted_exception_msg)
- Implement this method to execute the given query and returns the result - Implement this method to execute the given query and returns the result
as a 2 dimensional array. as a 2 dimensional array.
* execute_dict(query, params) * execute_dict(query, params, formatted_exception_msg)
- Implement this method to execute the given query and returns the result - Implement this method to execute the given query and returns the result
as an array of dict (column name -> value) format. as an array of dict (column name -> value) format.
@@ -131,7 +131,7 @@ class BaseConnection(object):
- Implement this method to wait for asynchronous connection with timeout. - Implement this method to wait for asynchronous connection with timeout.
This must be a non blocking call. This must be a non blocking call.
* poll() * poll(formatted_exception_msg)
- Implement this method to poll the data of query running on asynchronous - Implement this method to poll the data of query running on asynchronous
connection. connection.
@@ -147,29 +147,30 @@ class BaseConnection(object):
ASYNC_READ_TIMEOUT = 2 ASYNC_READ_TIMEOUT = 2
ASYNC_WRITE_TIMEOUT = 3 ASYNC_WRITE_TIMEOUT = 3
ASYNC_NOT_CONNECTED = 4 ASYNC_NOT_CONNECTED = 4
ASYNC_EXECUTION_ABORTED = 5
@abstractmethod @abstractmethod
def connect(self, **kwargs): def connect(self, **kwargs):
pass pass
@abstractmethod @abstractmethod
def execute_scalar(self, query, params=None): def execute_scalar(self, query, params=None, formatted_exception_msg=False):
pass pass
@abstractmethod @abstractmethod
def execute_async(self, query, params=None): def execute_async(self, query, params=None, formatted_exception_msg=True):
pass pass
@abstractmethod @abstractmethod
def execute_void(self, query, params=None): def execute_void(self, query, params=None, formatted_exception_msg=False):
pass pass
@abstractmethod @abstractmethod
def execute_2darray(self, query, params=None): def execute_2darray(self, query, params=None, formatted_exception_msg=False):
pass pass
@abstractmethod @abstractmethod
def execute_dict(self, query, params=None): def execute_dict(self, query, params=None, formatted_exception_msg=False):
pass pass
@abstractmethod @abstractmethod
@@ -201,7 +202,7 @@ class BaseConnection(object):
pass pass
@abstractmethod @abstractmethod
def poll(self): def poll(self, formatted_exception_msg=True):
pass pass
@abstractmethod @abstractmethod

View File

@@ -35,7 +35,7 @@ from .keywords import ScanKeyword
_ = gettext _ = gettext
ASYNC_WAIT_TIMEOUT = 0.1 # in seconds or 100 milliseconds ASYNC_WAIT_TIMEOUT = 0.01 # in seconds or 10 milliseconds
class Connection(BaseConnection): class Connection(BaseConnection):
@@ -50,20 +50,20 @@ class Connection(BaseConnection):
* connect(**kwargs) * connect(**kwargs)
- Connect the PostgreSQL/Postgres Plus servers using the psycopg2 driver - Connect the PostgreSQL/Postgres Plus servers using the psycopg2 driver
* execute_scalar(query, params) * execute_scalar(query, params, formatted_exception_msg)
- Execute the given query and returns single datum result - Execute the given query and returns single datum result
* execute_async(query, params) * execute_async(query, params, formatted_exception_msg)
- Execute the given query asynchronously and returns result. - Execute the given query asynchronously and returns result.
* execute_void(query, params) * execute_void(query, params, formatted_exception_msg)
- Execute the given query with no result. - Execute the given query with no result.
* execute_2darray(query, params) * execute_2darray(query, params, formatted_exception_msg)
- Execute the given query and returns the result as a 2 dimensional - Execute the given query and returns the result as a 2 dimensional
array. array.
* execute_dict(query, params) * execute_dict(query, params, formatted_exception_msg)
- Execute the given query and returns the result as an array of dict - Execute the given query and returns the result as an array of dict
(column name -> value) format. (column name -> value) format.
@@ -91,7 +91,7 @@ class Connection(BaseConnection):
- This method is used to wait for asynchronous connection with timeout. - This method is used to wait for asynchronous connection with timeout.
This is a non blocking call. This is a non blocking call.
* poll() * poll(formatted_exception_msg)
- This method is used to poll the data of query running on asynchronous - This method is used to poll the data of query running on asynchronous
connection. connection.
@@ -102,6 +102,12 @@ class Connection(BaseConnection):
* messages() * messages()
- Returns the list of messages/notices sends from the PostgreSQL database - Returns the list of messages/notices sends from the PostgreSQL database
server. server.
* _formatted_exception_msg(exception_obj, formatted_msg)
- This method is used to parse the psycopg2.Error object and returns the
formatted error message if flag is set to true else return
normal error message.
""" """
def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0): def __init__(self, manager, conn_id, db, auto_reconnect=True, async=0):
assert(manager is not None) assert(manager is not None)
@@ -116,6 +122,7 @@ class Connection(BaseConnection):
self.__async_cursor = None self.__async_cursor = None
self.__async_query_id = None self.__async_query_id = None
self.__backend_pid = None self.__backend_pid = None
self.execution_aborted = False
super(Connection, self).__init__() super(Connection, self).__init__()
@@ -224,6 +231,7 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
self.conn = pg_conn self.conn = pg_conn
self.__backend_pid = pg_conn.get_backend_pid() self.__backend_pid = pg_conn.get_backend_pid()
self.execution_aborted = False
# autocommit flag does not work with asynchronous connections. # autocommit flag does not work with asynchronous connections.
# By default asynchronous connection runs in autocommit mode. # By default asynchronous connection runs in autocommit mode.
@@ -409,7 +417,7 @@ Attempt to reconnect it failed with the below error:
if self.async == 1: if self.async == 1:
self._wait(cur.connection) self._wait(cur.connection)
def execute_scalar(self, query, params=None): def execute_scalar(self, query, params=None, formatted_exception_msg=False):
status, cur = self.__cursor() status, cur = self.__cursor()
if not status: if not status:
@@ -428,7 +436,7 @@ Attempt to reconnect it failed with the below error:
self.__internal_blocking_execute(cur, query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error( current_app.logger.error(
"Failed to execute query (execute_scalar) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( "Failed to execute query (execute_scalar) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format(
server_id=self.manager.sid, server_id=self.manager.sid,
@@ -447,13 +455,14 @@ Attempt to reconnect it failed with the below error:
return True, None return True, None
def execute_async(self, query, params=None): def execute_async(self, query, params=None, formatted_exception_msg=True):
""" """
This function executes the given query asynchronously and returns result. This function executes the given query asynchronously and returns result.
Args: Args:
query: SQL query to run. query: SQL query to run.
params: extra parameters to the function params: extra parameters to the function
formatted_exception_msg: if True then function return the formatted exception message
""" """
status, cur = self.__cursor() status, cur = self.__cursor()
@@ -472,10 +481,11 @@ Execute (async) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{qu
) )
try: try:
self.execution_aborted = False
cur.execute(query, params) cur.execute(query, params)
res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT) res = self._wait_timeout(cur.connection, ASYNC_WAIT_TIMEOUT)
except psycopg2.Error as pe: except psycopg2.Error as pe:
errmsg = str(pe) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(""" current_app.logger.error("""
Failed to execute query (execute_async) for the server #{server_id} - {conn_id} Failed to execute query (execute_async) for the server #{server_id} - {conn_id}
(Query-id: {query_id}):\nError Message:{errmsg} (Query-id: {query_id}):\nError Message:{errmsg}
@@ -494,13 +504,14 @@ Failed to execute query (execute_async) for the server #{server_id} - {conn_id}
return True, res return True, res
def execute_void(self, query, params=None): def execute_void(self, query, params=None, formatted_exception_msg=False):
""" """
This function executes the given query with no result. This function executes the given query with no result.
Args: Args:
query: SQL query to run. query: SQL query to run.
params: extra parameters to the function params: extra parameters to the function
formatted_exception_msg: if True then function return the formatted exception message
""" """
status, cur = self.__cursor() status, cur = self.__cursor()
@@ -522,7 +533,7 @@ Execute (void) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{que
self.__internal_blocking_execute(cur, query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error(""" current_app.logger.error("""
Failed to execute query (execute_void) for the server #{server_id} - {conn_id} Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
(Query-id: {query_id}):\nError Message:{errmsg} (Query-id: {query_id}):\nError Message:{errmsg}
@@ -538,7 +549,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
return True, None return True, None
def execute_2darray(self, query, params=None): def execute_2darray(self, query, params=None, formatted_exception_msg=False):
status, cur = self.__cursor() status, cur = self.__cursor()
if not status: if not status:
@@ -557,7 +568,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
self.__internal_blocking_execute(cur, query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error( current_app.logger.error(
"Failed to execute query (execute_2darray) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( "Failed to execute query (execute_2darray) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format(
server_id=self.manager.sid, server_id=self.manager.sid,
@@ -582,7 +593,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
return True, {'columns': columns, 'rows': rows} return True, {'columns': columns, 'rows': rows}
def execute_dict(self, query, params=None): def execute_dict(self, query, params=None, formatted_exception_msg=False):
status, cur = self.__cursor() status, cur = self.__cursor()
if not status: if not status:
@@ -600,7 +611,7 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
self.__internal_blocking_execute(cur, query, params) self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
errmsg = str(pe) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error( current_app.logger.error(
"Failed to execute query (execute_dict) for the server #{server_id}- {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( "Failed to execute query (execute_dict) for the server #{server_id}- {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format(
server_id=self.manager.sid, server_id=self.manager.sid,
@@ -728,42 +739,81 @@ Failed to reset the connection of the server due to following error:
if state == psycopg2.extensions.POLL_OK: if state == psycopg2.extensions.POLL_OK:
return self.ASYNC_OK return self.ASYNC_OK
elif state == psycopg2.extensions.POLL_WRITE: elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [], time) if select.select([], [conn.fileno()], [], time) == ([], [], []):
return self.ASYNC_WRITE_TIMEOUT return self.ASYNC_WRITE_TIMEOUT
# Call recursively if no timeout
return self._wait_timeout(conn, time)
elif state == psycopg2.extensions.POLL_READ: elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [], time) if select.select([conn.fileno()], [], [], time) == ([], [], []):
return self.ASYNC_READ_TIMEOUT return self.ASYNC_READ_TIMEOUT
return self._wait_timeout(conn, time)
else: else:
raise psycopg2.OperationalError("poll() returned %s from _wait_timeout function" % state) raise psycopg2.OperationalError(
"poll() returned %s from _wait_timeout function" % state
)
def poll(self): def poll(self, formatted_exception_msg=False):
""" """
This function is a wrapper around connection's poll function. This function is a wrapper around connection's poll function.
It internally uses the _wait_timeout method to poll the It internally uses the _wait_timeout method to poll the
result on the connection object. In case of success it result on the connection object. In case of success it
returns the result of the query. returns the result of the query.
Args:
formatted_exception_msg: if True then function return the formatted
exception message, otherwise error string.
""" """
cur = self.__async_cursor cur = self.__async_cursor
if not cur: if not cur:
return False, gettext("Cursor could not found for the aysnc connection."), None return False, gettext(
"Cursor could not be found for the aysnc connection."
), None
current_app.logger.log(25, """ current_app.logger.log(
Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_id)) 25,
"Polling result for (Query-id: {query_id})".format(
query_id=self.__async_query_id
)
)
try:
status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT)
except psycopg2.Error as pe:
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
return False, errmsg, None
status = self._wait_timeout(self.conn, ASYNC_WAIT_TIMEOUT)
colinfo = None colinfo = None
if status == self.ASYNC_OK: if status == self.ASYNC_OK:
# Fetch the column information
colinfo = [desc for desc in cur.description]
# if user has cancelled the transaction then changed the status
if self.execution_aborted:
status = self.ASYNC_EXECUTION_ABORTED
self.execution_aborted = False
return status, None, colinfo
# Fetch the column information
if cur.description is not None:
colinfo = [desc for desc in cur.description]
result = cur.statusmessage
if cur.rowcount > 0: if cur.rowcount > 0:
result = [] result = []
# Fetch the data rows.
for row in cur: # For DDL operation, we may not have result.
result.append(dict(row)) #
self.__async_cursor = None # Because - there is not direct way to differentiate DML and
return status, result, colinfo # DDL operations, we need to rely on exception to figure that
# out at the moment.
try:
for row in cur:
result.append(dict(row))
except psycopg2.ProgrammingError:
result = cur.statusmessage
self.__async_cursor = None
return status, result, colinfo
return status, None, colinfo return status, None, colinfo
def cancel_transaction(self, conn_id, did=None): def cancel_transaction(self, conn_id, did=None):
@@ -823,6 +873,9 @@ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_
else: else:
if self.connected(): if self.connected():
status, msg = self.execute_void(query) status, msg = self.execute_void(query)
if status:
cancel_conn.execution_aborted = True
else: else:
status = False status = False
msg = gettext("Not connected to the database server.") msg = gettext("Not connected to the database server.")
@@ -835,6 +888,70 @@ Polling result for (Query-id: {query_id})""".format(query_id=self.__async_query_
""" """
return self.conn.notices if self.conn else [] return self.conn.notices if self.conn else []
def _formatted_exception_msg(self, exception_obj, formatted_msg):
"""
This method is used to parse the psycopg2.Error object and returns the
formatted error message if flag is set to true else return
normal error message.
Args:
exception_obj: exception object
formatted_msg: if True then function return the formatted exception message
"""
if exception_obj.pgerror:
errmsg = exception_obj.pgerror
elif exception_obj.diag.message_detail:
errmsg = exception_obj.diag.message_detail
else:
errmsg = str(exception_obj)
# if formatted_msg is false then return from the function
if not formatted_msg:
return errmsg
errmsg += '********** Error **********\n\n'
if exception_obj.diag.severity is not None \
and exception_obj.diag.message_primary is not None:
errmsg += exception_obj.diag.severity + ": " + \
exception_obj.diag.message_primary
elif exception_obj.diag.message_primary is not None:
errmsg += exception_obj.diag.message_primary
if exception_obj.diag.sqlstate is not None:
if not errmsg[:-1].endswith('\n'):
errmsg += '\n'
errmsg += gettext('SQL state: ')
errmsg += exception_obj.diag.sqlstate
if exception_obj.diag.message_detail is not None:
if not errmsg[:-1].endswith('\n'):
errmsg += '\n'
errmsg += gettext('Detail: ')
errmsg += exception_obj.diag.message_detail
if exception_obj.diag.message_hint is not None:
if not errmsg[:-1].endswith('\n'):
errmsg += '\n'
errmsg += gettext('Hint: ')
errmsg += exception_obj.diag.message_hint
if exception_obj.diag.statement_position is not None:
if not errmsg[:-1].endswith('\n'):
errmsg += '\n'
errmsg += gettext('Character: ')
errmsg += exception_obj.diag.statement_position
if exception_obj.diag.context is not None:
if not errmsg[:-1].endswith('\n'):
errmsg += '\n'
errmsg += gettext('Context: ')
errmsg += exception_obj.diag.context
return errmsg
class ServerManager(object): class ServerManager(object):
""" """