Ensure we pick up the messages from the current query and not a previous one. Fixes #3094

This commit is contained in:
Khushboo Vashi 2018-02-26 14:19:43 +00:00 committed by Dave Page
parent fa9aebadbd
commit 08b3ccc01a
2 changed files with 21 additions and 44 deletions

View File

@ -168,6 +168,7 @@ class BaseConnection(object):
ASYNC_WRITE_TIMEOUT = 3 ASYNC_WRITE_TIMEOUT = 3
ASYNC_NOT_CONNECTED = 4 ASYNC_NOT_CONNECTED = 4
ASYNC_EXECUTION_ABORTED = 5 ASYNC_EXECUTION_ABORTED = 5
ASYNC_TIMEOUT = 0.2
@abstractmethod @abstractmethod
def connect(self, **kwargs): def connect(self, **kwargs):

View File

@ -110,7 +110,7 @@ class Connection(BaseConnection):
- This method is used to wait for asynchronous connection. This is a - This method is used to wait for asynchronous connection. This is a
blocking call. blocking call.
* _wait_timeout(conn, time) * _wait_timeout(conn)
- This method is used to wait for asynchronous connection with timeout. - This method is used to wait for asynchronous connection with timeout.
This is a non blocking call. This is a non blocking call.
@ -1261,51 +1261,27 @@ Failed to reset the connection to the server due to following error:
Args: Args:
conn: connection object conn: connection object
time: wait time
""" """
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
return self.ASYNC_OK
elif state == psycopg2.extensions.POLL_WRITE:
# Wait for the given time and then check the return status
# If three empty lists are returned then the time-out is reached.
timeout_status = select.select([], [conn.fileno()], [], 0)
if timeout_status == ([], [], []):
return self.ASYNC_WRITE_TIMEOUT
# poll again to check the state if it is still POLL_WRITE
# then return ASYNC_WRITE_TIMEOUT else return ASYNC_OK.
state = conn.poll() state = conn.poll()
if state == psycopg2.extensions.POLL_WRITE: if state == psycopg2.extensions.POLL_OK:
return self.ASYNC_WRITE_TIMEOUT return self.ASYNC_OK
return self.ASYNC_OK elif state == psycopg2.extensions.POLL_WRITE:
elif state == psycopg2.extensions.POLL_READ: # Wait for the given time and then check the return status
# Wait for the given time and then check the return status # If three empty lists are returned then the time-out is reached.
# If three empty lists are returned then the time-out is reached. timeout_status = select.select([], [conn.fileno()], [], self.ASYNC_TIMEOUT)
timeout_status = select.select([conn.fileno()], [], [], 0) if timeout_status == ([], [], []):
if timeout_status == ([], [], []): return self.ASYNC_WRITE_TIMEOUT
return self.ASYNC_READ_TIMEOUT elif state == psycopg2.extensions.POLL_READ:
# Wait for the given time and then check the return status
# select.select timeout option works only if we provide # If three empty lists are returned then the time-out is reached.
# empty [] [] [] file descriptor in select.select() function timeout_status = select.select([conn.fileno()], [], [], self.ASYNC_TIMEOUT)
# and that also works only on UNIX based system, it do not support if timeout_status == ([], [], []):
# Windows Hence we have wrote our own pooling mechanism to read return self.ASYNC_READ_TIMEOUT
# data fast each call conn.poll() reads chunks of data from else:
# connection object more we poll more we read data from connection raise psycopg2.OperationalError(
cnt = 0 "poll() returned %s from _wait_timeout function" % state
while cnt < 1000: )
# poll again to check the state if it is still POLL_READ
# then return ASYNC_READ_TIMEOUT else return ASYNC_OK.
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
return self.ASYNC_OK
cnt += 1
return self.ASYNC_READ_TIMEOUT
else:
raise psycopg2.OperationalError(
"poll() returned %s from _wait_timeout function" % state
)
def poll(self, formatted_exception_msg=False, no_result=False): def poll(self, formatted_exception_msg=False, no_result=False):
""" """