Ensure editable and read-only columns in Query Tool should be identified by icons and tooltips in the column header. Fixes #4667

This commit is contained in:
Yosry Muhammad
2019-08-26 14:17:40 +05:30
committed by Akshay Joshi
parent 5887fb3815
commit f8f7d5ac6f
15 changed files with 410 additions and 129 deletions

View File

@@ -430,40 +430,17 @@ def poll(trans_id):
oids = {'oid': 'oid'}
if columns_info is not None:
# If it is a QueryToolCommand that has obj_id attribute
# then it should also be editable
if hasattr(trans_obj, 'obj_id') and \
(not isinstance(trans_obj, QueryToolCommand) or
trans_obj.can_edit()):
# Get the template path for the column
template_path = 'columns/sql/#{0}#'.format(
conn.manager.version
)
# Only QueryToolCommand or TableCommand can be editable
if hasattr(trans_obj, 'obj_id') and trans_obj.can_edit():
columns = trans_obj.get_columns_types(conn)
SQL = render_template(
"/".join([template_path, 'nodes.sql']),
tid=trans_obj.obj_id,
has_oids=True
)
# rows with attribute not_null
colst, rset = conn.execute_2darray(SQL)
if not colst:
return internal_server_error(errormsg=rset)
for key, col in enumerate(columns_info):
col_type = dict()
col_type['type_code'] = col['type_code']
col_type['type_name'] = None
col_type['internal_size'] = col['internal_size']
columns[col['name']] = col_type
if rset:
col_type['not_null'] = col['not_null'] = \
rset['rows'][key]['not_null']
col_type['has_default_val'] = \
col['has_default_val'] = \
rset['rows'][key]['has_default_val']
else:
for col in columns_info:
col_type = dict()
col_type['type_code'] = col['type_code']
col_type['type_name'] = None
col_type['internal_size'] = col['internal_size']
columns[col['name']] = col_type
if columns:
st, types = fetch_pg_types(columns, trans_obj)

View File

@@ -22,6 +22,7 @@ from pgadmin.utils.driver import get_driver
from pgadmin.tools.sqleditor.utils.is_query_resultset_updatable \
import is_query_resultset_updatable
from pgadmin.tools.sqleditor.utils.save_changed_data import save_changed_data
from pgadmin.tools.sqleditor.utils.get_column_types import get_columns_types
from config import PG_DEFAULT_DRIVER
@@ -677,6 +678,16 @@ class TableCommand(GridCommand):
client_primary_key=client_primary_key,
conn=conn)
def get_columns_types(self, conn):
columns_info = conn.get_column_info()
has_oids = self.has_oids()
table_oid = self.obj_id
return get_columns_types(conn=conn,
columns_info=columns_info,
has_oids=has_oids,
table_oid=table_oid,
is_query_tool=False)
class ViewCommand(GridCommand):
"""
@@ -864,6 +875,7 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
self.primary_keys = None
self.pk_names = None
self.table_has_oids = False
self.columns_types = None
def get_sql(self, default_conn=None):
return None
@@ -874,6 +886,9 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
def get_primary_keys(self):
return self.pk_names, self.primary_keys
def get_columns_types(self, conn=None):
return self.columns_types
def has_oids(self):
return self.table_has_oids
@@ -906,8 +921,9 @@ class QueryToolCommand(BaseCommand, FetchedRowTracker):
# Get the path to the sql templates
sql_path = 'sqleditor/sql/#{0}#'.format(manager.version)
self.is_updatable_resultset, self.table_has_oids, self.primary_keys, \
pk_names, table_oid = is_query_resultset_updatable(conn, sql_path)
self.is_updatable_resultset, self.table_has_oids,\
self.primary_keys, pk_names, table_oid,\
self.columns_types = is_query_resultset_updatable(conn, sql_path)
# Create pk_names attribute in the required format
if pk_names is not None:

View File

@@ -385,13 +385,13 @@ input.editor-checkbox:focus {
/* For geometry column button */
.div-view-geometry-column {
.div-view-geometry-column, .editable-column-header-icon {
float: right;
height: 100%;
display: flex;
display: -webkit-flex;
align-items: center;
padding-right: 4px;
padding-right: 6px;
}
/* For leaflet popup */

View File

@@ -778,6 +778,7 @@ define('tools.querytool', [
not_null: c.not_null,
has_default_val: c.has_default_val,
is_array: c.is_array,
can_edit: c.can_edit,
};
// Get the columns width based on longer string among data type or
@@ -795,17 +796,17 @@ define('tools.querytool', [
if (c.cell == 'oid' && c.name == 'oid') {
options['editor'] = null;
} else if (c.cell == 'Json') {
options['editor'] = is_editable ? Slick.Editors.JsonText :
options['editor'] = c.can_edit ? Slick.Editors.JsonText :
Slick.Editors.ReadOnlyJsonText;
options['formatter'] = Slick.Formatters.JsonString;
} else if (c.cell == 'number' || c.cell == 'oid' ||
$.inArray(c.type, ['xid', 'real']) !== -1
) {
options['editor'] = is_editable ? Slick.Editors.CustomNumber :
options['editor'] = c.can_edit ? Slick.Editors.CustomNumber :
Slick.Editors.ReadOnlyText;
options['formatter'] = Slick.Formatters.Numbers;
} else if (c.cell == 'boolean') {
options['editor'] = is_editable ? Slick.Editors.Checkbox :
options['editor'] = c.can_edit ? Slick.Editors.Checkbox :
Slick.Editors.ReadOnlyCheckbox;
options['formatter'] = Slick.Formatters.Checkmark;
} else if (c.cell == 'binary') {
@@ -814,23 +815,41 @@ define('tools.querytool', [
} else if (c.cell == 'geometry' || c.cell == 'geography') {
// increase width to add 'view' button
options['width'] += 28;
options['can_edit'] = false;
} else {
options['editor'] = is_editable ? Slick.Editors.pgText :
options['editor'] = c.can_edit ? Slick.Editors.pgText :
Slick.Editors.ReadOnlypgText;
options['formatter'] = Slick.Formatters.Text;
}
if(!_.isUndefined(c.can_edit)) {
// Increase width for editable/read-only icon
options['width'] += 12;
let tooltip = '';
if(c.can_edit)
tooltip = gettext('Editable column');
else
tooltip = gettext('Read-only column');
options['toolTip'] = tooltip;
}
grid_columns.push(options);
});
var gridSelector = new GridSelector();
grid_columns = self.grid_columns = gridSelector.getColumnDefinitions(grid_columns);
// add 'view' button in geometry and geography type column header
_.each(grid_columns, function (c) {
// Add 'view' button in geometry and geography type column headers
if (c.column_type_internal == 'geometry' || c.column_type_internal == 'geography') {
GeometryViewer.add_header_button(c);
}
// Add editable/read-only icon to columns
if (!_.isUndefined(c.can_edit)) {
SqlEditorUtils.addEditableIcon(c, c.can_edit);
}
});
if (rows_affected) {
@@ -2634,10 +2653,11 @@ define('tools.querytool', [
// Create columns required by slick grid to render
_.each(colinfo, function(c) {
var is_primary_key = false;
var is_primary_key = false,
is_editable = self.can_edit && (!self.is_query_tool || c.is_editable);
// Check whether table have primary key
if (_.size(primary_keys) > 0) {
// Check whether this column is a primary key
if (is_editable && _.size(primary_keys) > 0) {
_.each(primary_keys, function(value, key) {
if (key === c.name)
is_primary_key = true;
@@ -2738,7 +2758,7 @@ define('tools.querytool', [
'pos': c.pos,
'label': column_label,
'cell': col_cell,
'can_edit': (c.name == 'oid') ? false : self.can_edit,
'can_edit': (c.name == 'oid') ? false : is_editable,
'type': type,
'not_null': c.not_null,
'has_default_val': c.has_default_val,

View File

@@ -1,6 +1,6 @@
{# ============= Fetch the columns ============= #}
{% if obj_id %}
SELECT at.attname, ty.typname
SELECT at.attname, ty.typname, at.attnum
FROM pg_attribute at
LEFT JOIN pg_type ty ON (ty.oid = at.atttypid)
WHERE attrelid={{obj_id}}::oid

View File

@@ -0,0 +1,57 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""
Get the column types for QueryToolCommand or TableCommand when
the result-set is editable.
"""
from flask import render_template
def get_columns_types(is_query_tool, columns_info, table_oid, conn, has_oids):
nodes_sqlpath = 'columns/sql/#{0}#'.format(conn.manager.version)
query = render_template(
"/".join([nodes_sqlpath, 'nodes.sql']),
tid=table_oid,
has_oids=has_oids
)
colst, rset = conn.execute_2darray(query)
if not colst:
raise Exception(rset)
column_types = dict()
for key, col in enumerate(columns_info):
col_type = dict()
col_type['type_code'] = col['type_code']
col_type['type_name'] = None
col_type['internal_size'] = col['internal_size']
column_types[col['name']] = col_type
if not is_query_tool:
col_type['not_null'] = col['not_null'] = \
rset['rows'][key]['not_null']
col_type['has_default_val'] = \
col['has_default_val'] = \
rset['rows'][key]['has_default_val']
else:
for row in rset['rows']:
if row['oid'] == col['table_column']:
col_type['not_null'] = col['not_null'] = row['not_null']
col_type['has_default_val'] = \
col['has_default_val'] = row['has_default_val']
else:
col_type['not_null'] = col['not_null'] = None
col_type['has_default_val'] = col['has_default_val'] = None
return column_types

View File

@@ -8,11 +8,18 @@
##########################################################################
"""
Check if the result-set of a query is updatable, A resultset is
updatable (as of this version) if:
- All columns belong to the same table.
- All the primary key columns of the table are present in the resultset
- No duplicate columns
Check if the result-set of a query is editable, A result-set is
editable if:
- All columns are either selected directly from a single table, or
are not table columns at all (e.g. concatenation of 2 columns).
Only columns that are selected directly from a the table are
editable, other columns are read-only.
- All the primary key columns or oids (if applicable) of the table are
present in the result-set.
Note:
- Duplicate columns (selected twice) or renamed columns are also
read-only.
"""
from flask import render_template
from flask_babelex import gettext
@@ -20,16 +27,18 @@ try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
from pgadmin.tools.sqleditor.utils.get_column_types import get_columns_types
def is_query_resultset_updatable(conn, sql_path):
"""
This function is used to check whether the last successful query
produced updatable results.
produced editable results.
Args:
conn: Connection object.
sql_path: the path to the sql templates.
sql_path: the path to the sql templates
primary_keys.sql & columns.sql.
"""
columns_info = conn.get_column_info()
@@ -37,26 +46,43 @@ def is_query_resultset_updatable(conn, sql_path):
return return_not_updatable()
table_oid = _check_single_table(columns_info)
if not table_oid:
return return_not_updatable()
if not _check_duplicate_columns(columns_info):
if table_oid is None:
return return_not_updatable()
if conn.connected():
primary_keys, pk_names = _check_primary_keys(conn=conn,
columns_info=columns_info,
table_oid=table_oid,
sql_path=sql_path)
# Get all the table columns
table_columns = _get_table_columns(conn=conn,
table_oid=table_oid,
sql_path=sql_path)
# Editable column: A column selected directly from a table, that is
# neither renamed nor is a duplicate of another selected column
_check_editable_columns(table_columns=table_columns,
results_columns=columns_info)
primary_keys, pk_names = \
_check_primary_keys(conn=conn,
columns_info=columns_info,
table_oid=table_oid,
sql_path=sql_path)
has_oids = _check_oids(conn=conn,
columns_info=columns_info,
table_oid=table_oid,
sql_path=sql_path)
if has_oids or primary_keys is not None:
return True, has_oids, primary_keys, pk_names, table_oid
is_resultset_updatable = has_oids or primary_keys is not None
if is_resultset_updatable:
column_types = get_columns_types(columns_info=columns_info,
table_oid=table_oid,
conn=conn,
has_oids=has_oids,
is_query_tool=True)
return True, has_oids, primary_keys, \
pk_names, table_oid, column_types
else:
_set_all_columns_not_editable(columns_info=columns_info)
return return_not_updatable()
else:
raise Exception(
@@ -66,20 +92,34 @@ def is_query_resultset_updatable(conn, sql_path):
def _check_single_table(columns_info):
table_oid = columns_info[0]['table_oid']
table_oid = None
for column in columns_info:
if column['table_oid'] != table_oid:
# Skip columns that are not directly from tables
if column['table_oid'] is None:
continue
# If we don't have a table_oid yet, store this one
if table_oid is None:
table_oid = column['table_oid']
# If we already have one, check that all the columns have the same one
elif column['table_oid'] != table_oid:
return None
return table_oid
def _check_duplicate_columns(columns_info):
column_numbers = \
[col['table_column'] for col in columns_info]
is_duplicate_columns = len(column_numbers) != len(set(column_numbers))
if is_duplicate_columns:
return False
return True
def _check_editable_columns(table_columns, results_columns):
table_columns_numbers = set()
for results_column in results_columns:
table_column_number = results_column['table_column']
if table_column_number is None: # Not a table column
results_column['is_editable'] = False
elif table_column_number in table_columns_numbers: # Duplicate
results_column['is_editable'] = False
elif results_column['display_name'] \
!= table_columns[table_column_number]:
results_column['is_editable'] = False
else:
results_column['is_editable'] = True
table_columns_numbers.add(table_column_number)
def _check_oids(conn, sql_path, table_oid, columns_info):
@@ -111,26 +151,34 @@ def _check_primary_keys(conn, columns_info, sql_path, table_oid):
table_oid=table_oid,
sql_path=sql_path)
if not _check_primary_keys_uniquely_exist(primary_keys_columns,
columns_info):
if not _check_all_primary_keys_exist(primary_keys_columns,
columns_info):
primary_keys = None
pk_names = None
return primary_keys, pk_names
def _check_primary_keys_uniquely_exist(primary_keys_columns, columns_info):
def _check_all_primary_keys_exist(primary_keys_columns, columns_info):
"""
Check that all primary keys exist.
If another column is selected with the same name as the primary key
before the primary key (e.g SELECT some_col as pk, pk from table) the
name of the actual primary key column gets changed to pk-2.
This is also reversed here.
"""
for pk in primary_keys_columns:
pk_exists = False
for col in columns_info:
if col['table_column'] == pk['column_number']:
if col['is_editable'] and \
col['table_column'] == pk['column_number']:
pk_exists = True
# If the primary key column is renamed
if col['display_name'] != pk['name']:
return False
# If a normal column is renamed to a primary key column name
elif col['display_name'] == pk['name']:
return False
# If the primary key is renamed, restore to its original name
if col['name'] != pk['name']:
col['name'], _ = col['name'].split('-')
# If another column is renamed to the primary key name, change it
elif col['name'] == pk['name']:
col['name'] += '-0'
if not pk_exists:
return False
return True
@@ -160,5 +208,26 @@ def _get_primary_keys(sql_path, table_oid, conn):
return primary_keys, primary_keys_columns, pk_names
def _get_table_columns(sql_path, table_oid, conn):
query = render_template(
"/".join([sql_path, 'get_columns.sql']),
obj_id=table_oid
)
status, result = conn.execute_dict(query)
if not status:
raise Exception(result)
columns = {}
for row in result['rows']:
columns[row['attnum']] = row['attname']
return columns
def _set_all_columns_not_editable(columns_info):
for col in columns_info:
col['is_editable'] = False
def return_not_updatable():
return False, False, None, None, None
return False, False, None, None, None, None

View File

@@ -25,67 +25,109 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
scenarios = [
('When selecting all columns of the table', dict(
sql='SELECT * FROM %s;',
primary_keys={
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=False,
table_has_oids=False
table_has_oids=False,
expected_cols_is_editable=[True, True, True, True]
)),
('When selecting all primary keys of the table', dict(
sql='SELECT pk_col1, pk_col2 FROM %s;',
primary_keys={
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=False,
table_has_oids=False
table_has_oids=False,
expected_cols_is_editable=[True, True]
)),
('When selecting some of the primary keys of the table', dict(
sql='SELECT pk_col2 FROM %s;',
primary_keys=None,
expected_primary_keys=None,
expected_has_oids=False,
table_has_oids=False
table_has_oids=False,
expected_cols_is_editable=[False]
)),
('When selecting none of the primary keys of the table', dict(
sql='SELECT normal_col1 FROM %s;',
primary_keys=None,
expected_primary_keys=None,
expected_has_oids=False,
table_has_oids=False
table_has_oids=False,
expected_cols_is_editable=[False]
)),
('When renaming a primary key', dict(
sql='SELECT pk_col1 as some_col, pk_col2 FROM "%s";',
primary_keys=None,
expected_primary_keys=None,
expected_has_oids=False,
table_has_oids=False
table_has_oids=False,
expected_cols_is_editable=[False, False]
)),
('When renaming a column to a primary key name', dict(
sql='SELECT pk_col1, pk_col2, normal_col1 as pk_col1 FROM %s;',
primary_keys=None,
('When renaming a normal column', dict(
sql='SELECT pk_col1, pk_col2, normal_col1 as some_col FROM "%s";',
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=False,
table_has_oids=False
table_has_oids=False,
expected_cols_is_editable=[True, True, False]
)),
('When renaming a normal column to a primary key name', dict(
sql='SELECT normal_col1 as pk_col1, pk_col1, pk_col2 FROM %s;',
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=False,
table_has_oids=False,
expected_cols_is_editable=[False, True, True]
)),
('When selecting a normal column twice', dict(
sql='SELECT pk_col1, pk_col2, normal_col1, normal_col1 FROM %s;',
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=False,
table_has_oids=False,
expected_cols_is_editable=[True, True, True, False]
)),
('When selecting a non-table column', dict(
sql='SELECT pk_col1, pk_col2, normal_col1 || normal_col2 FROM %s;',
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=False,
table_has_oids=False,
expected_cols_is_editable=[True, True, False]
)),
('When selecting primary keys and oids (table with oids)', dict(
sql='SELECT *, oid FROM %s;',
primary_keys={
expected_primary_keys={
'pk_col1': 'int4',
'pk_col2': 'int4'
},
expected_has_oids=True,
table_has_oids=True
table_has_oids=True,
expected_cols_is_editable=[True, True, True, True, False]
)),
('When selecting oids without primary keys (table with oids)', dict(
sql='SELECT oid, normal_col1, normal_col2 FROM %s;',
primary_keys=None,
expected_primary_keys=None,
expected_has_oids=True,
table_has_oids=True
table_has_oids=True,
expected_cols_is_editable=[False, True, True]
)),
('When selecting none of the primary keys or oids (table with oids)',
dict(
sql='SELECT normal_col1, normal_col2 FROM %s;',
primary_keys=None,
expected_primary_keys=None,
expected_has_oids=False,
table_has_oids=True
table_has_oids=True,
expected_cols_is_editable=[False, False]
))
]
@@ -99,6 +141,7 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
response_data = self._execute_select_sql()
self._check_primary_keys(response_data)
self._check_oids(response_data)
self._check_editable_columns(response_data)
def tearDown(self):
# Disconnect the database
@@ -116,12 +159,18 @@ class TestQueryUpdatableResultset(BaseTestGenerator):
def _check_primary_keys(self, response_data):
primary_keys = response_data['data']['primary_keys']
self.assertEquals(primary_keys, self.primary_keys)
self.assertEquals(primary_keys, self.expected_primary_keys)
def _check_oids(self, response_data):
has_oids = response_data['data']['has_oids']
self.assertEquals(has_oids, self.expected_has_oids)
def _check_editable_columns(self, response_data):
columns_info = response_data['data']['colinfo']
for col, expected_is_editable in \
zip(columns_info, self.expected_cols_is_editable):
self.assertEquals(col['is_editable'], expected_is_editable)
def _initialize_database_connection(self):
database_info = parent_node_dict["database"][-1]
self.db_name = database_info["db_name"]