Added support for editing of result sets from tables with OIDs in query tool.

This commit is contained in:
Yosry Muhammad
2019-07-29 12:26:53 +05:30
committed by Akshay Joshi
parent 2ef3080d0e
commit 82d209946f
6 changed files with 143 additions and 40 deletions

View File

@@ -131,7 +131,7 @@ You can:
A result set is updatable if:
* All the columns belong to the same table.
* All the primary keys of the table are selected.
* All the primary keys or OIDs of the table are explicitly selected.
* No columns are duplicated.
An updatable result set can be modified just like in

View File

@@ -414,11 +414,6 @@ def poll(trans_id):
if 'primary_keys' in session_obj:
primary_keys = session_obj['primary_keys']
if 'has_oids' in session_obj:
has_oids = session_obj['has_oids']
if has_oids:
oids = {'oid': 'oid'}
# Fetch column information
columns_info = conn.get_column_info()
client_primary_key = generate_client_primary_key_name(
@@ -429,13 +424,22 @@ def poll(trans_id):
# If trans_obj is a QueryToolCommand then check for updatable
# resultsets and primary keys
if isinstance(trans_obj, QueryToolCommand):
trans_obj.check_updatable_results_pkeys()
trans_obj.check_updatable_results_pkeys_oids()
pk_names, primary_keys = trans_obj.get_primary_keys()
session_obj['has_oids'] = trans_obj.has_oids()
# Update command_obj in session obj
session_obj['command_obj'] = pickle.dumps(
trans_obj, -1)
# If primary_keys exist, add them to the session_obj to
# allow for saving any changes to the data
if primary_keys is not None:
session_obj['primary_keys'] = primary_keys
if 'has_oids' in session_obj:
has_oids = session_obj['has_oids']
if has_oids:
oids = {'oid': 'oid'}
if columns_info is not None:
# If it is a QueryToolCommand that has obj_id attribute
# then it should also be editable

View File

@@ -863,6 +863,7 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
self.is_updatable_resultset = False
self.primary_keys = None
self.pk_names = None
self.table_has_oids = False
def get_sql(self, default_conn=None):
return None
@@ -873,13 +874,16 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
def get_primary_keys(self):
return self.pk_names, self.primary_keys
def has_oids(self):
return self.table_has_oids
def can_edit(self):
return self.is_updatable_resultset
def can_filter(self):
return False
def check_updatable_results_pkeys(self):
def check_updatable_results_pkeys_oids(self):
"""
This function is used to check whether the last successful query
produced updatable results and sets the necessary flags and
@@ -895,8 +899,8 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
# Get the path to the sql templates
sql_path = 'sqleditor/sql/#{0}#'.format(manager.version)
self.is_updatable_resultset, self.primary_keys, pk_names, table_oid = \
is_query_resultset_updatable(conn, sql_path)
self.is_updatable_resultset, self.table_has_oids, self.primary_keys, \
pk_names, table_oid = is_query_resultset_updatable(conn, sql_path)
# Create pk_names attribute in the required format
if pk_names is not None:

View File

@@ -27,33 +27,65 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
}
},
expected_has_oids=False,
table_has_oids=False
)),
('When selecting all primary keys of the table', dict(
sql='SELECT pk_col1, pk_col2 FROM %s;',
primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
}
},
expected_has_oids=False,
table_has_oids=False
)),
('When selecting some of the primary keys of the table', dict(
sql='SELECT pk_col2 FROM %s;',
primary_keys=None
primary_keys=None,
expected_has_oids=False,
table_has_oids=False
)),
('When selecting none of the primary keys of the table', dict(
sql='SELECT normal_col1 FROM %s;',
primary_keys=None
primary_keys=None,
expected_has_oids=False,
table_has_oids=False
)),
('When renaming a primary key', dict(
sql='SELECT pk_col1 as some_col, '
'pk_col2 FROM "%s";',
primary_keys=None
sql='SELECT pk_col1 as some_col, pk_col2 FROM "%s";',
primary_keys=None,
expected_has_oids=False,
table_has_oids=False
)),
('When renaming a column to a primary key name', dict(
sql='SELECT pk_col1, pk_col2, normal_col1 as pk_col1 '
'FROM %s;',
primary_keys=None
))
sql='SELECT pk_col1, pk_col2, normal_col1 as pk_col1 FROM %s;',
primary_keys=None,
expected_has_oids=False,
table_has_oids=False
)),
('When selecting primary keys and oids (table with oids)', dict(
sql='SELECT *, oid FROM %s;',
primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=True,
table_has_oids=True
)),
('When selecting oids without primary keys (table with oids)', dict(
sql='SELECT oid, normal_col1, normal_col2 FROM %s;',
primary_keys=None,
expected_has_oids=True,
table_has_oids=True
)),
('When selecting none of the primary keys or oids (table with oids)',
dict(
sql='SELECT normal_col1, normal_col2 FROM %s;',
primary_keys=None,
expected_has_oids=False,
table_has_oids=True
))
]
def setUp(self):
@@ -63,9 +95,10 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
def runTest(self):
# Create test table (unique for each scenario)
self._create_test_table()
test_table_name = self._create_test_table(
table_has_oids=self.table_has_oids)
# Add test table name to the query
sql = self.sql % self.test_table_name
sql = self.sql % test_table_name
is_success, response_data = \
execute_query(tester=self.tester,
query=sql,
@@ -77,6 +110,10 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
primary_keys = response_data['data']['primary_keys']
self.assertEquals(primary_keys, self.primary_keys)
# Check oids
has_oids = response_data['data']['has_oids']
self.assertEquals(has_oids, self.expected_has_oids)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)
@@ -85,11 +122,18 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
database_info = parent_node_dict["database"][-1]
self.server_id = database_info["server_id"]
self.server_version = parent_node_dict["schema"][-1]["server_version"]
if self.server_version >= 120000 and self.table_has_oids:
self.skipTest('Tables with OIDs are not supported starting '
'PostgreSQL 12')
self.db_id = database_info["db_id"]
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if not db_con["info"] == "Database connected.":
raise Exception("Could not connect to the database.")
@@ -108,9 +152,9 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
self.poll_url = '/sqleditor/poll/{0}'.format(self.trans_id)
def _create_test_table(self):
self.test_table_name = "test_for_updatable_resultset" + \
str(random.randint(1000, 9999))
def _create_test_table(self, table_has_oids=False):
test_table_name = "test_for_updatable_resultset" + \
str(random.randint(1000, 9999))
create_sql = """
DROP TABLE IF EXISTS "%s";
@@ -120,8 +164,13 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
normal_col1 VARCHAR,
normal_col2 VARCHAR,
PRIMARY KEY(pk_col1, pk_col2)
);
""" % (self.test_table_name, self.test_table_name)
)
""" % (test_table_name, test_table_name)
if table_has_oids:
create_sql += ' WITH OIDS;'
else:
create_sql += ';'
is_success, _ = \
execute_query(tester=self.tester,
@@ -129,3 +178,4 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
start_query_tool_url=self.start_query_tool_url,
poll_url=self.poll_url)
self.assertEquals(is_success, True)
return test_table_name

View File

@@ -15,6 +15,7 @@
- No duplicate columns
"""
from flask import render_template
from flask_babelex import gettext
try:
from collections import OrderedDict
except ImportError:
@@ -43,18 +44,25 @@ def is_query_resultset_updatable(conn, sql_path):
return return_not_updatable()
if conn.connected():
primary_keys, primary_keys_columns, pk_names = \
_get_primary_keys(conn=conn,
table_oid=table_oid,
sql_path=sql_path)
primary_keys, pk_names = _check_primary_keys(conn=conn,
columns_info=columns_info,
table_oid=table_oid,
sql_path=sql_path)
if not _check_primary_keys_uniquely_exist(primary_keys_columns,
columns_info):
has_oids = _check_oids(conn=conn,
columns_info=columns_info,
table_oid=table_oid,
sql_path=sql_path)
if has_oids or primary_keys is not None:
return True, has_oids, primary_keys, pk_names, table_oid
else:
return return_not_updatable()
return True, primary_keys, pk_names, table_oid
else:
return return_not_updatable()
raise Exception(
gettext('Not connected to server or connection with the '
'server has been closed.')
)
def _check_single_table(columns_info):
@@ -74,6 +82,42 @@ def _check_duplicate_columns(columns_info):
return True
def _check_oids(conn, sql_path, table_oid, columns_info):
# Remove the special behavior of OID columns from
# PostgreSQL 12 onwards, so returning False.
if conn.manager.sversion >= 120000:
return False
# Check that the table has oids
query = render_template(
"/".join([sql_path, 'has_oids.sql']), obj_id=table_oid)
status, has_oids = conn.execute_scalar(query)
if not status:
raise Exception(has_oids)
# Check that the oid column is selected in results columns
oid_column_selected = False
for col in columns_info:
if col['table_column'] is None and col['display_name'] == 'oid':
oid_column_selected = True
break
return has_oids and oid_column_selected
def _check_primary_keys(conn, columns_info, sql_path, table_oid):
primary_keys, primary_keys_columns, pk_names = \
_get_primary_keys(conn=conn,
table_oid=table_oid,
sql_path=sql_path)
if not _check_primary_keys_uniquely_exist(primary_keys_columns,
columns_info):
primary_keys = None
pk_names = None
return primary_keys, pk_names
def _check_primary_keys_uniquely_exist(primary_keys_columns, columns_info):
for pk in primary_keys_columns:
pk_exists = False
@@ -99,7 +143,7 @@ def _get_primary_keys(sql_path, table_oid, conn):
)
status, result = conn.execute_dict(query)
if not status:
return return_not_updatable()
raise Exception(result)
primary_keys_columns = []
primary_keys = OrderedDict()
@@ -117,4 +161,4 @@ def _get_primary_keys(sql_path, table_oid, conn):
def return_not_updatable():
return False, None, None, None
return False, False, None, None, None

View File

@@ -45,8 +45,9 @@ class StartRunningQuery:
if type(session_obj) is Response:
return session_obj
# Remove any existing primary keys in session_obj
# Remove any existing primary keys or has_oids in session_obj
session_obj.pop('primary_keys', None)
session_obj.pop('oids', None)
transaction_object = pickle.loads(session_obj['command_obj'])
can_edit = False