Ensure the query tool will work with older versions of psycopg2 than we officially support, albeit without updateable resultsets. Fixes #4520

This commit is contained in:
Yosry Muhammad
2019-08-01 13:59:53 +01:00
committed by Dave Page
parent cbe40176c1
commit 7b65507533
8 changed files with 54 additions and 25 deletions

View File

@@ -50,6 +50,9 @@ class QueryToolJourneyTest(BaseFeatureTest):
self.page.add_server(self.server)
driver_version = test_utils.get_driver_version()
self.driver_version = float('.'.join(driver_version.split('.')[:2]))
def runTest(self):
self._navigate_to_query_tool()
self._execute_query(
@@ -190,6 +193,9 @@ class QueryToolJourneyTest(BaseFeatureTest):
self._assert_clickable(query_we_need_to_scroll_to)
def _test_updatable_resultset(self):
if self.driver_version < 2.8:
return
self.page.click_tab("Query Editor")
# Insert data into test table

View File

@@ -424,16 +424,16 @@ def poll(trans_id):
# If trans_obj is a QueryToolCommand then check for updatable
# resultsets and primary keys
if isinstance(trans_obj, QueryToolCommand):
trans_obj.check_updatable_results_pkeys_oids()
pk_names, primary_keys = trans_obj.get_primary_keys()
session_obj['has_oids'] = trans_obj.has_oids()
# Update command_obj in session obj
session_obj['command_obj'] = pickle.dumps(
trans_obj, -1)
# If primary_keys exist, add them to the session_obj to
# allow for saving any changes to the data
if primary_keys is not None:
session_obj['primary_keys'] = primary_keys
if trans_obj.check_updatable_results_pkeys_oids():
pk_names, primary_keys = trans_obj.get_primary_keys()
session_obj['has_oids'] = trans_obj.has_oids()
# Update command_obj in session obj
session_obj['command_obj'] = pickle.dumps(
trans_obj, -1)
# If primary_keys exist, add them to the session_obj to
# allow for saving any changes to the data
if primary_keys is not None:
session_obj['primary_keys'] = primary_keys
if 'has_oids' in session_obj:
has_oids = session_obj['has_oids']

View File

@@ -896,6 +896,13 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
manager = driver.connection_manager(self.sid)
conn = manager.connection(did=self.did, conn_id=self.conn_id)
# Get the driver version as a float
driver_version = float('.'.join(driver.Version().split('.')[:2]))
# Checking for updatable resultsets uses features in psycopg 2.8
if driver_version < 2.8:
return False
# Get the path to the sql templates
sql_path = 'sqleditor/sql/#{0}#'.format(manager.version)
@@ -918,6 +925,7 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
self.__set_updatable_results_attrs(sql_path=sql_path,
table_oid=table_oid,
conn=conn)
return self.is_updatable_resultset
def save(self,
changed_data,

View File

@@ -15,7 +15,7 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from .execute_query_utils import execute_query
from pgadmin.tools.sqleditor.tests.execute_query_utils import execute_query
class TestQueryUpdatableResultset(BaseTestGenerator):
@@ -120,6 +120,7 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
def _initialize_database_connection(self):
database_info = parent_node_dict["database"][-1]
self.db_name = database_info["db_name"]
self.server_id = database_info["server_id"]
self.server_version = parent_node_dict["schema"][-1]["server_version"]
@@ -128,6 +129,12 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
self.skipTest('Tables with OIDs are not supported starting '
'PostgreSQL 12')
driver_version = utils.get_driver_version()
driver_version = float('.'.join(driver_version.split('.')[:2]))
if driver_version < 2.8:
self.skipTest('Updatable resultsets require pyscopg 2.8 or later')
self.db_id = database_info["db_id"]
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
@@ -172,10 +179,5 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
else:
create_sql += ';'
is_success, _ = \
execute_query(tester=self.tester,
query=create_sql,
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)
utils.create_table_with_query(self.server, self.db_name, create_sql)
return test_table_name

View File

@@ -15,11 +15,11 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from .execute_query_utils import execute_query
from pgadmin.tools.sqleditor.tests.execute_query_utils import execute_query
class TestSaveChangedData(BaseTestGenerator):
""" This class tests saving data changes in the grid to the database """
""" This class tests saving data changes to updatable query resultsets """
scenarios = [
('When inserting new valid row', dict(
save_payload={
@@ -304,6 +304,7 @@ class TestSaveChangedData(BaseTestGenerator):
def _initialize_database_connection(self):
database_info = parent_node_dict["database"][-1]
self.db_name = database_info["db_name"]
self.server_id = database_info["server_id"]
self.db_id = database_info["db_id"]
@@ -311,6 +312,13 @@ class TestSaveChangedData(BaseTestGenerator):
utils.SERVER_GROUP,
self.server_id,
self.db_id)
driver_version = utils.get_driver_version()
driver_version = float('.'.join(driver_version.split('.')[:2]))
if driver_version < 2.8:
self.skipTest('Updatable resultsets require pyscopg 2.8 or later')
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
@@ -347,9 +355,5 @@ class TestSaveChangedData(BaseTestGenerator):
self.test_table_name,
self.test_table_name)
self.select_sql = 'SELECT * FROM %s;' % self.test_table_name
is_success, _ = \
execute_query(tester=self.tester,
query=create_sql,
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)
utils.create_table_with_query(self.server, self.db_name, create_sql)

View File

@@ -1080,3 +1080,8 @@ def get_watcher_dialogue_status(self):
else:
break
return status
def get_driver_version():
version = getattr(psycopg2, '__version__', None)
return version