From 326dc2bbcc6e8c923f8eca2d823a17259b5d3fa4 Mon Sep 17 00:00:00 2001 From: Khushboo Vashi Date: Tue, 4 Jul 2023 10:44:46 +0530 Subject: [PATCH] Fix an issue where queries longer than 1 minute get stuck - Container 7.1. #6317 (#6491) Fix an issue where queries longer than 1 minute get stuck - Container 7.1. #6317 Fix an issue where queries get stuck with auto-completion enabled. #6356 Fix an issue where queries can't complete execution. #6163 --- docs/en_US/release_notes_7_4.rst | 1 - docs/en_US/release_notes_7_5.rst | 3 + web/pgadmin/tools/sqleditor/__init__.py | 106 ++++++++--- web/pgadmin/tools/sqleditor/command.py | 20 ++ .../js/components/sections/ResultSet.jsx | 29 +-- .../tests/execute_query_test_utils.py | 18 ++ .../tests/test_download_csv_query_tool.py | 12 +- .../sqleditor/tests/test_encoding_charset.py | 11 +- .../sqleditor/tests/test_explain_plan.py | 9 +- .../sqleditor/tests/test_poll_query_tool.py | 10 +- .../tests/test_sql_ascii_encoding.py | 7 +- .../tools/sqleditor/tests/test_view_data.py | 8 +- .../sqleditor/utils/start_running_query.py | 54 ++++-- .../tests/test_save_changed_uuid_data.py | 4 +- .../utils/tests/test_start_running_query.py | 176 ------------------ .../utils/driver/psycopg3/connection.py | 38 ++-- 16 files changed, 242 insertions(+), 264 deletions(-) diff --git a/docs/en_US/release_notes_7_4.rst b/docs/en_US/release_notes_7_4.rst index fb7b2d891..02a855ab5 100644 --- a/docs/en_US/release_notes_7_4.rst +++ b/docs/en_US/release_notes_7_4.rst @@ -44,4 +44,3 @@ Bug fixes | `Issue #6420 `_ - Fix raise notice from func/proc or code blocks are no longer displayed live. | `Issue #6431 `_ - Fix an issue where PSQL is not working if the database name have quotes or double quotes. | `Issue #6435 `_ - Fix an issue where all the menus are enabled when pgAdmin is opened and no object is selected in the object explorer. - diff --git a/docs/en_US/release_notes_7_5.rst b/docs/en_US/release_notes_7_5.rst index ba73ece83..2425ac938 100644 --- a/docs/en_US/release_notes_7_5.rst +++ b/docs/en_US/release_notes_7_5.rst @@ -31,6 +31,9 @@ Housekeeping Bug fixes ********* + | `Issue #6163 `_ - Fix an issue where queries can't complete execution. | `Issue #6165 `_ - Fixed an issue where Import Export not working when using pgpassfile. + | `Issue #6317 `_ - Fix an issue where queries longer than 1 minute get stuck - Container 7.1 + | `Issue #6356 `_ - Fix an issue where queries get stuck with auto-completion enabled. | `Issue #6364 `_ - Fixed Query Tool/ PSQL tool tab title not getting updated on database rename. | `Issue #6515 `_ - Fixed an issue where the query tool is unable to execute a query on Postgres 10 and below versions. diff --git a/web/pgadmin/tools/sqleditor/__init__.py b/web/pgadmin/tools/sqleditor/__init__.py index 9a72b6efd..34994c7de 100644 --- a/web/pgadmin/tools/sqleditor/__init__.py +++ b/web/pgadmin/tools/sqleditor/__init__.py @@ -14,6 +14,7 @@ import re import secrets from urllib.parse import unquote from threading import Lock +import threading import json from config import PG_DEFAULT_DRIVER, ALLOW_SAVE_PASSWORD, SHARED_STORAGE @@ -34,7 +35,7 @@ from pgadmin.tools.sqleditor.utils.update_session_grid_transaction import \ from pgadmin.utils import PgAdminModule from pgadmin.utils import get_storage_directory from pgadmin.utils.ajax import make_json_response, bad_request, \ - success_return, internal_server_error + success_return, internal_server_error, service_unavailable from pgadmin.utils.driver import get_driver from pgadmin.utils.exception import ConnectionLost, SSHTunnelConnectionLost, \ CryptKeyMissing, ObjectGone @@ -415,6 +416,7 @@ def _connect(conn, **kwargs): def _init_sqleditor(trans_id, connect, sgid, sid, did, **kwargs): # Create asynchronous connection using random connection id. conn_id = str(secrets.choice(range(1, 9999999))) + conn_id_ac = str(secrets.choice(range(1, 9999999))) manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid) @@ -422,7 +424,8 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, **kwargs): did = manager.did try: command_obj = ObjectRegistry.get_object( - 'query_tool', conn_id=conn_id, sgid=sgid, sid=sid, did=did + 'query_tool', conn_id=conn_id, sgid=sgid, sid=sid, did=did, + conn_id_ac=conn_id_ac ) except Exception as e: current_app.logger.error(e) @@ -433,8 +436,8 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, **kwargs): auto_reconnect=False, use_binary_placeholder=True, array_to_string=True) - pref = Preferences.module('sqleditor') + if connect: kwargs['auto_commit'] = pref.preference('auto_commit').get() kwargs['auto_rollback'] = pref.preference('auto_rollback').get() @@ -461,6 +464,15 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, **kwargs): else: return True, internal_server_error( errormsg=str(msg)), '', '' + + if pref.preference('autocomplete_on_key_press').get(): + conn_ac = manager.connection(did=did, conn_id=conn_id_ac, + auto_reconnect=False, + use_binary_placeholder=True, + array_to_string=True) + status, msg, is_ask_password, user, role, password = _connect( + conn_ac, **kwargs) + except (ConnectionLost, SSHTunnelConnectionLost) as e: current_app.logger.error(e) raise @@ -659,15 +671,30 @@ def close_sqleditor_session(trans_id): conn.cancel_transaction(cmd_obj.conn_id, cmd_obj.did) manager.release(did=cmd_obj.did, conn_id=cmd_obj.conn_id) + # Close the auto complete connection + if cmd_obj.conn_id_ac is not None: + manager = get_driver( + PG_DEFAULT_DRIVER).connection_manager(cmd_obj.sid) + if manager is not None: + conn = manager.connection( + did=cmd_obj.did, conn_id=cmd_obj.conn_id_ac) -def check_transaction_status(trans_id): + # Release the connection + if conn.connected(): + conn.cancel_transaction(cmd_obj.conn_id_ac, cmd_obj.did) + manager.release(did=cmd_obj.did, + conn_id=cmd_obj.conn_id_ac) + + +def check_transaction_status(trans_id, auto_comp=False): """ This function is used to check the transaction id is available in the session object and connection status. Args: - trans_id: + trans_id: Transaction Id + auto_comp: Auto complete flag Returns: status and connection object @@ -687,12 +714,19 @@ def check_transaction_status(trans_id): session_obj = grid_data[str(trans_id)] trans_obj = pickle.loads(session_obj['command_obj']) + if auto_comp: + conn_id = trans_obj.conn_id_ac + connect = True + else: + conn_id = trans_obj.conn_id + connect = True if 'connect' in request.args and \ + request.args['connect'] == '1' else False try: manager = get_driver( PG_DEFAULT_DRIVER).connection_manager(trans_obj.sid) conn = manager.connection( did=trans_obj.did, - conn_id=trans_obj.conn_id, + conn_id=conn_id, auto_reconnect=False, use_binary_placeholder=True, array_to_string=True @@ -703,10 +737,7 @@ def check_transaction_status(trans_id): current_app.logger.error(e) return False, internal_server_error(errormsg=str(e)), None, None, None - connect = True if 'connect' in request.args and \ - request.args['connect'] == '1' else False - - if connect: + if connect and conn and not conn.connected(): conn.connect() return True, None, conn, trans_obj, session_obj @@ -874,23 +905,38 @@ def poll(trans_id): data_obj = {} on_demand_record_count = Preferences.module(MODULE_NAME).\ preference('on_demand_record_count').get() - # Check the transaction and connection status status, error_msg, conn, trans_obj, session_obj = \ check_transaction_status(trans_id) + if type(error_msg) is Response: + return error_msg + if error_msg == ERROR_MSG_TRANS_ID_NOT_FOUND: return make_json_response(success=0, errormsg=error_msg, info='DATAGRID_TRANSACTION_REQUIRED', status=404) - if not conn.async_cursor_initialised(): - return make_json_response(data={'status': 'NotInitialised'}) + is_thread_alive = False + if trans_obj.get_thread_native_id(): + for thread in threading.enumerate(): + if thread.native_id == trans_obj.get_thread_native_id() and\ + thread.is_alive(): + is_thread_alive = True + break - if status and conn is not None and session_obj is not None: + if is_thread_alive: + status = 'Busy' + elif status and conn is not None and session_obj is not None: status, result = conn.poll( formatted_exception_msg=True, no_result=True) if not status: + if not conn.connected(): + return service_unavailable( + gettext("Connection to the server has been lost."), + info="CONNECTION_LOST", + ) + messages = conn.messages() if messages and len(messages) > 0: additional_messages = ''.join(messages) @@ -1029,8 +1075,9 @@ def poll(trans_id): status = 'NotConnected' result = error_msg - transaction_status = conn.transaction_status() - data_obj['db_name'] = conn.db + transaction_status = conn.transaction_status() if conn else 0 + data_obj['db_name'] = conn.db if conn else None + data_obj['db_id'] = trans_obj.did \ if trans_obj is not None and hasattr(trans_obj, 'did') else 0 @@ -1760,7 +1807,7 @@ def auto_complete(trans_id): # Check the transaction and connection status status, error_msg, conn, trans_obj, session_obj = \ - check_transaction_status(trans_id) + check_transaction_status(trans_id, auto_comp=True) if error_msg == ERROR_MSG_TRANS_ID_NOT_FOUND: return make_json_response(success=0, errormsg=error_msg, @@ -1770,15 +1817,18 @@ def auto_complete(trans_id): if status and conn is not None and \ trans_obj is not None and session_obj is not None: - if trans_id not in auto_complete_objects: - # Create object of SQLAutoComplete class and pass connection object - auto_complete_objects[trans_id] = \ - SQLAutoComplete(sid=trans_obj.sid, did=trans_obj.did, - conn=conn) + with sqleditor_close_session_lock: + if trans_id not in auto_complete_objects: + # Create object of SQLAutoComplete class and pass + # connection object + auto_complete_objects[trans_id] = \ + SQLAutoComplete(sid=trans_obj.sid, did=trans_obj.did, + conn=conn) - auto_complete_obj = auto_complete_objects[trans_id] - # Get the auto completion suggestions. - res = auto_complete_obj.get_completions(full_sql, text_before_cursor) + auto_complete_obj = auto_complete_objects[trans_id] + # # Get the auto completion suggestions. + res = auto_complete_obj.get_completions(full_sql, + text_before_cursor) else: status = False res = error_msg @@ -2455,6 +2505,12 @@ def add_query_history(trans_id): status, error_msg, conn, trans_obj, session_ob = \ check_transaction_status(trans_id) + if not trans_obj: + return make_json_response( + data={ + 'status': False, + } + ) return QueryHistory.save(current_user.id, trans_obj.sid, conn.db, request=request) diff --git a/web/pgadmin/tools/sqleditor/command.py b/web/pgadmin/tools/sqleditor/command.py index 5d227bb6d..91da4a830 100644 --- a/web/pgadmin/tools/sqleditor/command.py +++ b/web/pgadmin/tools/sqleditor/command.py @@ -364,6 +364,8 @@ class GridCommand(BaseCommand, SQLFilter, FetchedRowTracker): if self.cmd_type in (VIEW_FIRST_100_ROWS, VIEW_LAST_100_ROWS): self.limit = 100 + self.thread_native_id = None + def get_primary_keys(self, *args, **kwargs): return None, None @@ -441,6 +443,12 @@ class GridCommand(BaseCommand, SQLFilter, FetchedRowTracker): else: return 'asc' + def get_thread_native_id(self): + return self.thread_native_id + + def set_thread_native_id(self, thread_native_id): + self.thread_native_id = thread_native_id + class TableCommand(GridCommand): """ @@ -881,6 +889,8 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker): FetchedRowTracker.__init__(self, **kwargs) self.conn_id = kwargs['conn_id'] if 'conn_id' in kwargs else None + self.conn_id_ac = kwargs['conn_id_ac'] if 'conn_id_ac' in kwargs\ + else None self.auto_rollback = False self.auto_commit = True @@ -890,6 +900,7 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker): self.pk_names = None self.table_has_oids = False self.columns_types = None + self.thread_native_id = None def get_sql(self, default_conn=None): return None @@ -982,6 +993,9 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker): def set_connection_id(self, conn_id): self.conn_id = conn_id + def set_connection_id_ac(self, conn_id): + self.conn_id_ac = conn_id + def set_auto_rollback(self, auto_rollback): self.auto_rollback = auto_rollback @@ -1009,3 +1023,9 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker): self.object_name = result['rows'][0]['relname'] else: raise InternalServerError(SERVER_CONNECTION_CLOSED) + + def get_thread_native_id(self): + return self.thread_native_id + + def set_thread_native_id(self, thread_native_id): + self.thread_native_id = thread_native_id diff --git a/web/pgadmin/tools/sqleditor/static/js/components/sections/ResultSet.jsx b/web/pgadmin/tools/sqleditor/static/js/components/sections/ResultSet.jsx index 9331a5c77..dc229d66d 100644 --- a/web/pgadmin/tools/sqleditor/static/js/components/sections/ResultSet.jsx +++ b/web/pgadmin/tools/sqleditor/static/js/components/sections/ResultSet.jsx @@ -88,10 +88,6 @@ export class ResultSetUtils { return msg; } - static isCursorInitialised(httpMessage) { - return httpMessage.data.data.status === 'NotInitialised'; - } - static isQueryFinished(httpMessage) { return httpMessage.data.data.status === 'Success'; } @@ -282,7 +278,7 @@ export class ResultSetUtils { }); } - handlePollError(error) { + handlePollError(error, explainObject, flags) { this.eventBus.fireEvent(QUERY_TOOL_EVENTS.EXECUTION_END); this.eventBus.fireEvent(QUERY_TOOL_EVENTS.FOCUS_PANEL, PANELS.MESSAGES); this.eventBus.fireEvent(QUERY_TOOL_EVENTS.SET_CONNECTION_STATUS, CONNECTION_STATUS.TRANSACTION_STATUS_INERROR); @@ -296,10 +292,15 @@ export class ResultSetUtils { query_source: this.historyQuerySource, is_pgadmin_query: false, }); - this.eventBus.fireEvent(QUERY_TOOL_EVENTS.HANDLE_API_ERROR, error); + this.eventBus.fireEvent(QUERY_TOOL_EVENTS.HANDLE_API_ERROR, error, { + connectionLostCallback: ()=>{ + this.eventBus.fireEvent(QUERY_TOOL_EVENTS.EXECUTION_START, this.query, explainObject, flags.external, true); + }, + checkTransaction: true, + }); } - async pollForResult(onResultsAvailable, onExplain, onPollError) { + async pollForResult(onResultsAvailable, onExplain, onPollError, explainObject, flags) { try { let httpMessage = await this.poll(); let msg = ''; @@ -307,9 +308,7 @@ export class ResultSetUtils { this.eventBus.fireEvent(QUERY_TOOL_EVENTS.PUSH_NOTICE, httpMessage.data.data.notifies); } - if (ResultSetUtils.isCursorInitialised(httpMessage)) { - return Promise.resolve(this.pollForResult(onResultsAvailable, onExplain, onPollError)); - } else if (ResultSetUtils.isQueryFinished(httpMessage)) { + if (ResultSetUtils.isQueryFinished(httpMessage)) { this.setEndTime(new Date()); msg = this.queryFinished(httpMessage, onResultsAvailable, onExplain); } else if (ResultSetUtils.isQueryStillRunning(httpMessage)) { @@ -317,7 +316,7 @@ export class ResultSetUtils { if(httpMessage.data.data.result) { this.eventBus.fireEvent(QUERY_TOOL_EVENTS.SET_MESSAGE, httpMessage.data.data.result, true); } - return Promise.resolve(this.pollForResult(onResultsAvailable, onExplain, onPollError)); + return Promise.resolve(this.pollForResult(onResultsAvailable, onExplain, onPollError, explainObject, flags)); } else if (ResultSetUtils.isConnectionToServerLostWhilePolling(httpMessage)) { this.setEndTime(new Date()); msg = httpMessage.data.data.result; @@ -348,7 +347,7 @@ export class ResultSetUtils { } } catch (error) { onPollError(); - this.handlePollError(error); + this.handlePollError(error, explainObject, flags); } } @@ -830,12 +829,14 @@ export function ResultSet() { ()=>{ setColumns([]); setRows([]); - } + }, + explainObject, + {isQueryTool: queryToolCtx.params.is_query_tool, external: external, reconnect: reconnect} ); }; const executeAndPoll = async ()=>{ - yesCallback(); + await yesCallback(); pollCallback(); }; diff --git a/web/pgadmin/tools/sqleditor/tests/execute_query_test_utils.py b/web/pgadmin/tools/sqleditor/tests/execute_query_test_utils.py index 3f30b31cb..c8188e506 100644 --- a/web/pgadmin/tools/sqleditor/tests/execute_query_test_utils.py +++ b/web/pgadmin/tools/sqleditor/tests/execute_query_test_utils.py @@ -39,3 +39,21 @@ def poll_for_query_results(tester, poll_url): return True, response_data elif status == 'NotConnected' or status == 'Cancel': return False, None + + +def async_poll(tester, poll_url): + while True: + response = tester.get(poll_url) + if response.data: + response_data = json.loads(response.data.decode('utf-8')) + + if response_data['success'] == 1 and 'data' in response_data\ + and ( + response_data['data']['status'] == 'NotInitialised' or + response_data['data']['status'] == 'Busy' + ): + pass + else: + return response + else: + return response diff --git a/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py b/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py index db0619f96..dba5f86eb 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py +++ b/web/pgadmin/tools/sqleditor/tests/test_download_csv_query_tool.py @@ -17,6 +17,8 @@ import json from pgadmin.utils import server_utils import secrets import config +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll class TestDownloadCSV(BaseTestGenerator): @@ -120,10 +122,8 @@ class TestDownloadCSV(BaseTestGenerator): self.assertEqual(response.status_code, 200) - # Query tool polling - url = '/sqleditor/poll/{0}'.format(trans_id) - response = self.tester.get(url) - return response + return async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format(trans_id)) def runTest(self): @@ -152,8 +152,8 @@ class TestDownloadCSV(BaseTestGenerator): # Disable the console logging from Flask logger self.app.logger.disabled = True if not self.is_valid and self.is_valid_tx: - # The result will be null but status code will be 200 - self.assertEqual(res.status_code, 200) + # The result will be null and status code will be 500 + self.assertEqual(res.status_code, 500) elif self.filename is None: if self.download_as_txt: with patch('pgadmin.tools.sqleditor.blueprint.' diff --git a/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py index 94e1eaa53..c624ea30f 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py +++ b/web/pgadmin/tools/sqleditor/tests/test_encoding_charset.py @@ -8,13 +8,15 @@ # ########################################################################## +import json +import secrets from pgadmin.utils.route import BaseTestGenerator from pgadmin.browser.server_groups.servers.databases.tests import utils as \ database_utils from regression.python_test_utils import test_utils -import json from pgadmin.utils import server_utils -import secrets +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll class TestEncodingCharset(BaseTestGenerator): @@ -274,8 +276,9 @@ class TestEncodingCharset(BaseTestGenerator): response = self.tester.post(url, data=json.dumps({"sql": sql}), content_type='html/json') self.assertEqual(response.status_code, 200) - url = '/sqleditor/poll/{0}'.format(self.trans_id) - response = self.tester.get(url) + response = async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format( + self.trans_id)) self.assertEqual(response.status_code, 200) response_data = json.loads(response.data.decode('utf-8')) self.assertEqual(response_data['data']['rows_fetched_to'], 1) diff --git a/web/pgadmin/tools/sqleditor/tests/test_explain_plan.py b/web/pgadmin/tools/sqleditor/tests/test_explain_plan.py index 9b9f41c1d..fbb816d6f 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_explain_plan.py +++ b/web/pgadmin/tools/sqleditor/tests/test_explain_plan.py @@ -15,6 +15,8 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \ from pgadmin.utils.route import BaseTestGenerator from regression import parent_node_dict from regression.python_test_utils import test_utils as utils +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll class TestExplainPlan(BaseTestGenerator): @@ -57,9 +59,10 @@ class TestExplainPlan(BaseTestGenerator): self.assertEqual(response.status_code, 200) - # Query tool polling - url = '/sqleditor/poll/{0}'.format(self.trans_id) - response = self.tester.get(url) + response = async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format( + self.trans_id)) + self.assertEqual(response.status_code, 200) response_data = json.loads(response.data.decode('utf-8')) diff --git a/web/pgadmin/tools/sqleditor/tests/test_poll_query_tool.py b/web/pgadmin/tools/sqleditor/tests/test_poll_query_tool.py index a97f7e349..7b3e163e4 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_poll_query_tool.py +++ b/web/pgadmin/tools/sqleditor/tests/test_poll_query_tool.py @@ -15,6 +15,8 @@ from pgadmin.utils.route import BaseTestGenerator from regression import parent_node_dict from regression.python_test_utils import test_utils as utils import secrets +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll class TestPollQueryTool(BaseTestGenerator): @@ -80,6 +82,7 @@ NOTICE: Hello, world! url = '/sqleditor/initialize/sqleditor/{0}/{1}/{2}/{3}'.format( self.trans_id, utils.SERVER_GROUP, self.server_id, self.db_id) response = self.tester.post(url) + import time self.assertEqual(response.status_code, 200) cnt = 0 @@ -92,14 +95,13 @@ NOTICE: Hello, world! self.assertEqual(response.status_code, 200) - # Query tool polling - url = '/sqleditor/poll/{0}'.format(self.trans_id) - response = self.tester.get(url) + response = async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format( + self.trans_id)) self.assertEqual(response.status_code, 200) response_data = json.loads(response.data.decode('utf-8')) if self.expected_message[cnt] is not None: - # Check the returned messages self.assertIn(self.expected_message[cnt], response_data['data']['additional_messages']) diff --git a/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py b/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py index e03a56308..d4a04bed1 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py +++ b/web/pgadmin/tools/sqleditor/tests/test_sql_ascii_encoding.py @@ -16,6 +16,8 @@ from pgadmin.utils import server_utils from pgadmin.browser.server_groups.servers.databases.tests import utils as \ database_utils import config +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll class TestSQLASCIIEncoding(BaseTestGenerator): @@ -105,8 +107,9 @@ class TestSQLASCIIEncoding(BaseTestGenerator): response = self.tester.post(url, data=json.dumps({"sql": sql}), content_type='html/json') self.assertEqual(response.status_code, 200) - url = '/sqleditor/poll/{0}'.format(self.trans_id) - response = self.tester.get(url) + response = async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format( + self.trans_id)) self.assertEqual(response.status_code, 200) response_data = json.loads(response.data) self.assertEqual(response_data['data']['rows_fetched_to'], 1) diff --git a/web/pgadmin/tools/sqleditor/tests/test_view_data.py b/web/pgadmin/tools/sqleditor/tests/test_view_data.py index babb16b63..f3bac59f4 100644 --- a/web/pgadmin/tools/sqleditor/tests/test_view_data.py +++ b/web/pgadmin/tools/sqleditor/tests/test_view_data.py @@ -16,6 +16,8 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \ from regression import parent_node_dict from regression.python_test_utils import test_utils from unittest.mock import patch +from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ + import async_poll class TestViewData(BaseTestGenerator): @@ -116,9 +118,9 @@ class TestViewData(BaseTestGenerator): response = self.tester.get(url) self.assertEqual(response.status_code, 200) - # Check the query result - url = '/sqleditor/poll/{0}'.format(self.trans_id) - response = self.tester.get(url) + response = async_poll(tester=self.tester, + poll_url='/sqleditor/poll/{0}'.format( + self.trans_id)) self.assertEqual(response.status_code, 200) response_data = json.loads(response.data.decode('utf-8')) diff --git a/web/pgadmin/tools/sqleditor/utils/start_running_query.py b/web/pgadmin/tools/sqleditor/utils/start_running_query.py index 09c9de84c..9c3076e3b 100644 --- a/web/pgadmin/tools/sqleditor/utils/start_running_query.py +++ b/web/pgadmin/tools/sqleditor/utils/start_running_query.py @@ -11,8 +11,8 @@ import pickle import secrets - -from flask import Response +from threading import Thread +from flask import Response, current_app, copy_current_request_context from flask_babel import gettext from config import PG_DEFAULT_DRIVER @@ -55,6 +55,8 @@ class StartRunningQuery: can_filter = False notifies = None trans_status = None + status = -1 + result = None if transaction_object is not None and session_obj is not None: # set fetched row count to 0 as we are executing query again. transaction_object.update_fetched_row_cnt(0) @@ -85,7 +87,7 @@ class StartRunningQuery: effective_sql_statement = apply_explain_plan_wrapper_if_needed( manager, sql) - result, status = self.__execute_query( + self.__execute_query( conn, session_obj, effective_sql_statement, @@ -99,6 +101,7 @@ class StartRunningQuery: # Get the notifies notifies = conn.get_notifies() trans_status = conn.transaction_status() + else: status = False result = gettext( @@ -134,17 +137,34 @@ class StartRunningQuery: conn, sql): conn.execute_void("BEGIN;") - # Execute sql asynchronously with params is None - # and formatted_error is True. - status, result = conn.execute_async(sql) + is_rollback_req = StartRunningQuery.is_rollback_statement_required( + trans_obj, + conn) - # If the transaction aborted for some reason and - # Auto RollBack is True then issue a rollback to cleanup. - if StartRunningQuery.is_rollback_statement_required(trans_obj, - conn): - conn.execute_void("ROLLBACK;") + @copy_current_request_context + def asyn_exec_query(conn, sql, trans_obj, is_rollback_req, + app): + # Execute sql asynchronously with params is None + # and formatted_error is True. + with app.app_context(): + try: + status, result = conn.execute_async(sql) + # # If the transaction aborted for some reason and + # # Auto RollBack is True then issue a rollback to cleanup. + if is_rollback_req: + conn.execute_void("ROLLBACK;") + except Exception as e: + self.logger.error(e) + return internal_server_error(errormsg=str(e)) - return result, status + _thread = pgAdminThread(target=asyn_exec_query, + args=(conn, sql, trans_obj, is_rollback_req, + current_app._get_current_object()) + ) + _thread.start() + trans_obj.set_thread_native_id(_thread.native_id) + StartRunningQuery.save_transaction_in_session(session_obj, + trans_id, trans_obj) @staticmethod def is_begin_required_for_sql_query(trans_obj, conn, sql): @@ -187,3 +207,13 @@ class StartRunningQuery: # Fetch the object for the specified transaction id. # Use pickle.loads function to get the command object return grid_data[str(transaction_id)] + + +class pgAdminThread(Thread): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.app = current_app._get_current_object() + + def run(self): + with self.app.app_context(): + super().run() diff --git a/web/pgadmin/tools/sqleditor/utils/tests/test_save_changed_uuid_data.py b/web/pgadmin/tools/sqleditor/utils/tests/test_save_changed_uuid_data.py index 75c0f2ac5..fbc955c7d 100644 --- a/web/pgadmin/tools/sqleditor/utils/tests/test_save_changed_uuid_data.py +++ b/web/pgadmin/tools/sqleditor/utils/tests/test_save_changed_uuid_data.py @@ -15,8 +15,8 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \ from pgadmin.utils.route import BaseTestGenerator from regression import parent_node_dict from regression.python_test_utils import test_utils as utils -from pgadmin.tools.sqleditor.tests.execute_query_test_utils \ - import execute_query +from pgadmin.tools.sqleditor.tests.execute_query_test_utils import\ + execute_query class TestSaveChangedDataUUID(BaseTestGenerator): diff --git a/web/pgadmin/tools/sqleditor/utils/tests/test_start_running_query.py b/web/pgadmin/tools/sqleditor/utils/tests/test_start_running_query.py index 0770943c3..004f7eb2a 100644 --- a/web/pgadmin/tools/sqleditor/utils/tests/test_start_running_query.py +++ b/web/pgadmin/tools/sqleditor/utils/tests/test_start_running_query.py @@ -237,182 +237,6 @@ class StartRunningQueryTest(BaseTestGenerator): expected_logger_error=get_connection_lost_exception, expect_execute_void_called_with='some sql', )), - ('When server is connected and start query async request, ' - 'it returns an success message', - dict( - function_parameters=dict( - sql=dict(sql='some sql', explain_plan=None), - trans_id=123, - http_session=dict(gridData={'123': dict(command_obj='')}) - ), - pickle_load_return=MagicMock( - conn_id=1, - update_fetched_row_cnt=MagicMock(), - set_connection_id=MagicMock(), - auto_commit=True, - auto_rollback=False, - can_edit=lambda: True, - can_filter=lambda: True - ), - get_driver_exception=False, - get_connection_lost_exception=False, - manager_connection_exception=None, - - is_connected_to_server=True, - connection_connect_return=None, - execute_async_return_value=[True, - 'async function result output'], - is_begin_required=False, - is_rollback_required=False, - apply_explain_plan_wrapper_if_needed_return_value='some sql', - - expect_make_json_response_to_have_been_called_with=dict( - data=dict( - status=True, - result='async function result output', - can_edit=True, - can_filter=True, - notifies=None, - transaction_status=None - ) - ), - expect_internal_server_error_called_with=None, - expected_logger_error=None, - expect_execute_void_called_with='some sql', - )), - ('When server is connected and start query async request and ' - 'begin is required, ' - 'it returns an success message', - dict( - function_parameters=dict( - sql=dict(sql='some sql', explain_plan=None), - trans_id=123, - http_session=dict(gridData={'123': dict(command_obj='')}) - ), - pickle_load_return=MagicMock( - conn_id=1, - update_fetched_row_cnt=MagicMock(), - set_connection_id=MagicMock(), - auto_commit=True, - auto_rollback=False, - can_edit=lambda: True, - can_filter=lambda: True - ), - get_driver_exception=False, - get_connection_lost_exception=False, - manager_connection_exception=None, - - is_connected_to_server=True, - connection_connect_return=None, - execute_async_return_value=[True, - 'async function result output'], - is_begin_required=True, - is_rollback_required=False, - apply_explain_plan_wrapper_if_needed_return_value='some sql', - - expect_make_json_response_to_have_been_called_with=dict( - data=dict( - status=True, - result='async function result output', - can_edit=True, - can_filter=True, - notifies=None, - transaction_status=None - ) - ), - expect_internal_server_error_called_with=None, - expected_logger_error=None, - expect_execute_void_called_with='some sql', - )), - ('When server is connected and start query async request and ' - 'rollback is required, ' - 'it returns an success message', - dict( - function_parameters=dict( - sql=dict(sql='some sql', explain_plan=None), - trans_id=123, - http_session=dict(gridData={'123': dict(command_obj='')}) - ), - pickle_load_return=MagicMock( - conn_id=1, - update_fetched_row_cnt=MagicMock(), - set_connection_id=MagicMock(), - auto_commit=True, - auto_rollback=False, - can_edit=lambda: True, - can_filter=lambda: True - ), - get_driver_exception=False, - get_connection_lost_exception=False, - manager_connection_exception=None, - - is_connected_to_server=True, - connection_connect_return=None, - execute_async_return_value=[True, - 'async function result output'], - is_begin_required=False, - is_rollback_required=True, - apply_explain_plan_wrapper_if_needed_return_value='some sql', - - expect_make_json_response_to_have_been_called_with=dict( - data=dict( - status=True, - result='async function result output', - can_edit=True, - can_filter=True, - notifies=None, - transaction_status=None - ) - ), - expect_internal_server_error_called_with=None, - expected_logger_error=None, - expect_execute_void_called_with='some sql', - )), - ('When server is connected and start query async request with ' - 'explain plan wrapper, ' - 'it returns an success message', - dict( - function_parameters=dict( - sql=dict(sql='some sql', explain_plan=None), - trans_id=123, - http_session=dict(gridData={'123': dict(command_obj='')}) - ), - pickle_load_return=MagicMock( - conn_id=1, - update_fetched_row_cnt=MagicMock(), - set_connection_id=MagicMock(), - auto_commit=True, - auto_rollback=False, - can_edit=lambda: True, - can_filter=lambda: True - ), - get_driver_exception=False, - get_connection_lost_exception=False, - manager_connection_exception=None, - - is_connected_to_server=True, - connection_connect_return=None, - execute_async_return_value=[True, - 'async function result output'], - is_begin_required=False, - is_rollback_required=True, - apply_explain_plan_wrapper_if_needed_return_value='EXPLAIN ' - 'PLAN some sql', - - expect_make_json_response_to_have_been_called_with=dict( - data=dict( - status=True, - result='async function result output', - can_edit=True, - can_filter=True, - notifies=None, - transaction_status=None - ) - ), - expect_internal_server_error_called_with=None, - expected_logger_error=None, - expect_execute_void_called_with='EXPLAIN PLAN some sql', - )), ] @patch('pgadmin.tools.sqleditor.utils.start_running_query' diff --git a/web/pgadmin/utils/driver/psycopg3/connection.py b/web/pgadmin/utils/driver/psycopg3/connection.py index f758602ee..4f328b353 100644 --- a/web/pgadmin/utils/driver/psycopg3/connection.py +++ b/web/pgadmin/utils/driver/psycopg3/connection.py @@ -171,6 +171,7 @@ class Connection(BaseConnection): self.async_ = async_ self.__async_cursor = None self.__async_query_id = None + self.__async_query_error = None self.__backend_pid = None self.execution_aborted = False self.row_count = 0 @@ -825,7 +826,8 @@ WHERE db.datname = current_database()""") query = str(cur.query, encoding) \ if cur and cur.query is not None else None except Exception: - current_app.logger.warning('Error encoding query') + current_app.logger.warning('Error encoding query with {0}'.format( + encoding)) current_app.logger.log( 25, @@ -1032,6 +1034,7 @@ WHERE db.datname = current_database()""") """ self.__async_cursor = None + self.__async_query_error = None status, cur = self.__cursor(scrollable=True) if not status: @@ -1077,13 +1080,15 @@ WHERE db.datname = current_database()""") query_id=query_id ) ) + self.__async_query_error = errmsg - if self.is_disconnected(pe): + if self.conn and self.conn.closed or self.is_disconnected(pe): raise ConnectionLost( self.manager.sid, self.db, None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] ) + return False, errmsg return True, None @@ -1296,7 +1301,7 @@ WHERE db.datname = current_database()""") ] or [] rows = [] - self.row_count = cur.get_rowcount() + self.row_count = cur.rowcount if cur.get_rowcount() > 0: rows = cur.fetchall() @@ -1320,6 +1325,12 @@ WHERE db.datname = current_database()""") if not cur: return False, self.CURSOR_NOT_FOUND + if not self.conn: + raise ConnectionLost( + self.manager.sid, + self.db, + None if self.conn_id[0:3] == 'DB:' else self.conn_id[5:] + ) if self.conn.pgconn.is_busy(): return False, gettext( "Asynchronous query execution/operation underway." @@ -1352,11 +1363,6 @@ WHERE db.datname = current_database()""") self.conn = None return False - def async_cursor_initialised(self): - if self.__async_cursor: - return True - return False - def _decrypt_password(self, manager): """ Decrypt password @@ -1422,10 +1428,13 @@ Failed to reset the connection to the server due to following error: return True, None def transaction_status(self): - if self.conn: + if self.conn and self.conn.info: return self.conn.info.transaction_status return None + def async_query_error(self): + return self.__async_query_error + def ping(self): return self.execute_scalar('SELECT 1') @@ -1455,8 +1464,12 @@ Failed to reset the connection to the server due to following error: def poll(self, formatted_exception_msg=False, no_result=False): cur = self.__async_cursor - if self.conn and self.conn.pgconn.is_busy(): + if self.conn and self.conn.info.transaction_status == 1: status = 3 + elif self.__async_query_error: + return False, self.__async_query_error + elif self.conn and self.conn.pgconn.error_message: + return False, self.conn.pgconn.error_message else: status = 1 @@ -1475,7 +1488,7 @@ Failed to reset the connection to the server due to following error: ) more_result = True while more_result: - if not self.conn.pgconn.is_busy(): + if self.conn: if cur.description is not None: self.column_info = [desc.to_dict() for desc in cur.ordered_description()] @@ -1733,7 +1746,7 @@ Failed to reset the connection to the server due to following error: # https://github.com/zzzeek/sqlalchemy/blob/master/lib/sqlalchemy/dialects/postgresql/psycopg2.py # def is_disconnected(self, err): - if not self.conn.closed: + if self.conn and not self.conn.closed: # checks based on strings. in the case that .closed # didn't cut it, fall back onto these. str_e = str(err).partition("\n")[0] @@ -1754,6 +1767,7 @@ Failed to reset the connection to the server due to following error: 'connection has been closed unexpectedly', 'SSL SYSCALL error: Bad file descriptor', 'SSL SYSCALL error: EOF detected', + 'terminating connection due to administrator command' ]: idx = str_e.find(msg) if idx >= 0 and '"' not in str_e[:idx]: