Use on-demand loading for results in the query tool. Fixes #2137

With a 27420 row query, pgAdmin III runs the query in 5.873s on my laptop. pgAdmin 4 now takes ~1s.
This commit is contained in:
Harshal Dhumal
2017-06-27 09:03:04 -04:00
committed by Dave Page
parent 15cb9fc35b
commit c65158312d
28 changed files with 1953 additions and 887 deletions

View File

@@ -27,7 +27,7 @@ from pgadmin.utils.sqlautocomplete.autocomplete import SQLAutoComplete
from pgadmin.misc.file_manager import Filemanager
from config import PG_DEFAULT_DRIVER
from config import PG_DEFAULT_DRIVER, ON_DEMAND_RECORD_COUNT
MODULE_NAME = 'sqleditor'
@@ -82,9 +82,9 @@ class SqlEditorModule(PgAdminModule):
'sqleditor.view_data_start',
'sqleditor.query_tool_start',
'sqleditor.query_tool_preferences',
'sqleditor.get_columns',
'sqleditor.poll',
'sqleditor.fetch_types',
'sqleditor.fetch',
'sqleditor.fetch_all',
'sqleditor.save',
'sqleditor.get_filter',
'sqleditor.apply_filter',
@@ -261,13 +261,32 @@ def start_view_data(trans_id):
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = check_transaction_status(trans_id)
# get the default connection as current connection which is attached to
# trans id holds the cursor which has query result so we cannot use that
# connection to execute another query otherwise we'll lose query result.
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(trans_obj.sid)
default_conn = manager.connection(did=trans_obj.did)
# Connect to the Server if not connected.
if not default_conn.connected():
status, msg = default_conn.connect()
if not status:
return make_json_response(
data={'status': status, 'result': u"{}".format(msg)}
)
if status and conn is not None \
and trans_obj is not None and session_obj is not None:
try:
# set fetched row count to 0 as we are executing query again.
trans_obj.update_fetched_row_cnt(0)
session_obj['command_obj'] = pickle.dumps(trans_obj, -1)
# Fetch the sql and primary_keys from the object
sql = trans_obj.get_sql()
pk_names, primary_keys = trans_obj.get_primary_keys()
pk_names, primary_keys = trans_obj.get_primary_keys(default_conn)
# Fetch the applied filter.
filter_applied = trans_obj.is_filter_applied()
@@ -338,6 +357,8 @@ def start_query_tool(trans_id):
# Use pickle.loads function to get the command object
session_obj = grid_data[str(trans_id)]
trans_obj = pickle.loads(session_obj['command_obj'])
# set fetched row count to 0 as we are executing query again.
trans_obj.update_fetched_row_cnt(0)
can_edit = False
can_filter = False
@@ -467,66 +488,6 @@ def preferences(trans_id):
return success_return()
@blueprint.route(
'/columns/<int:trans_id>', methods=["GET"], endpoint='get_columns'
)
@login_required
def get_columns(trans_id):
"""
This method will returns list of columns of last async query.
Args:
trans_id: unique transaction id
"""
columns = dict()
columns_info = None
primary_keys = None
rset = None
status, error_msg, conn, trans_obj, session_obj = check_transaction_status(trans_id)
if status and conn is not None and session_obj is not None:
ver = conn.manager.version
# Get the template path for the column
template_path = 'column/sql/#{0}#'.format(ver)
command_obj = pickle.loads(session_obj['command_obj'])
if hasattr(command_obj, 'obj_id'):
SQL = render_template("/".join([template_path,
'nodes.sql']),
tid=command_obj.obj_id)
# rows with attribute not_null
status, rset = conn.execute_2darray(SQL)
if not status:
return internal_server_error(errormsg=rset)
# Check PK column info is available or not
if 'primary_keys' in session_obj:
primary_keys = session_obj['primary_keys']
# Fetch column information
columns_info = conn.get_column_info()
if columns_info is not None:
for key, col in enumerate(columns_info):
col_type = dict()
col_type['type_code'] = col['type_code']
col_type['type_name'] = None
if rset:
col_type['not_null'] = col['not_null'] = \
rset['rows'][key]['not_null']
col_type['has_default_val'] = col['has_default_val'] = \
rset['rows'][key]['has_default_val']
columns[col['name']] = col_type
# As we changed the transaction object we need to
# restore it and update the session variable.
session_obj['columns_info'] = columns
update_session_grid_transaction(trans_id, session_obj)
return make_json_response(data={'status': True,
'columns': columns_info,
'primary_keys': primary_keys})
@blueprint.route('/poll/<int:trans_id>', methods=["GET"], endpoint='poll')
@login_required
@@ -539,12 +500,21 @@ def poll(trans_id):
"""
result = None
rows_affected = 0
rows_fetched_from = 0
rows_fetched_to = 0
has_more_rows = False
additional_result = []
columns = dict()
columns_info = None
primary_keys = None
types = {}
client_primary_key = None
rset = None
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = check_transaction_status(trans_id)
if status and conn is not None and session_obj is not None:
status, result = conn.poll(formatted_exception_msg=True)
status, result = conn.poll(formatted_exception_msg=True, no_result=True)
if not status:
return internal_server_error(result)
elif status == ASYNC_OK:
@@ -559,6 +529,80 @@ def poll(trans_id):
if (trans_status == TX_STATUS_INERROR and
trans_obj.auto_rollback):
conn.execute_void("ROLLBACK;")
st, result = conn.async_fetchmany_2darray(ON_DEMAND_RECORD_COUNT)
if st:
if 'primary_keys' in session_obj:
primary_keys = session_obj['primary_keys']
# Fetch column information
columns_info = conn.get_column_info()
client_primary_key = generate_client_primary_key_name(
columns_info
)
session_obj['client_primary_key'] = client_primary_key
if columns_info is not None:
command_obj = pickle.loads(session_obj['command_obj'])
if hasattr(command_obj, 'obj_id'):
# Get the template path for the column
template_path = 'column/sql/#{0}#'.format(
conn.manager.version
)
SQL = render_template("/".join([template_path,
'nodes.sql']),
tid=command_obj.obj_id)
# rows with attribute not_null
colst, rset = conn.execute_2darray(SQL)
if not colst:
return internal_server_error(errormsg=rset)
for key, col in enumerate(columns_info):
col_type = dict()
col_type['type_code'] = col['type_code']
col_type['type_name'] = None
columns[col['name']] = col_type
if rset:
col_type['not_null'] = col['not_null'] = \
rset['rows'][key]['not_null']
col_type['has_default_val'] = \
col['has_default_val'] = \
rset['rows'][key]['has_default_val']
if columns:
st, types = fetch_pg_types(columns, trans_obj)
if not st:
return internal_server_error(types)
for col_info in columns.values():
for col_type in types:
if col_type['oid'] == col_info['type_code']:
col_info['type_name'] = col_type['typname']
session_obj['columns_info'] = columns
# status of async_fetchmany_2darray is True and result is none
# means nothing to fetch
if result and rows_affected > -1:
res_len = len(result)
if res_len == ON_DEMAND_RECORD_COUNT:
has_more_rows = True
if res_len > 0:
rows_fetched_from = trans_obj.get_fetched_row_cnt()
trans_obj.update_fetched_row_cnt(rows_fetched_from + res_len)
rows_fetched_from += 1
rows_fetched_to = trans_obj.get_fetched_row_cnt()
session_obj['command_obj'] = pickle.dumps(trans_obj, -1)
# As we changed the transaction object we need to
# restore it and update the session variable.
update_session_grid_transaction(trans_id, session_obj)
elif status == ASYNC_EXECUTION_ABORTED:
status = 'Cancel'
else:
@@ -599,53 +643,123 @@ def poll(trans_id):
data={
'status': status, 'result': result,
'rows_affected': rows_affected,
'additional_messages': additional_messages
'rows_fetched_from': rows_fetched_from,
'rows_fetched_to': rows_fetched_to,
'additional_messages': additional_messages,
'has_more_rows': has_more_rows,
'colinfo': columns_info,
'primary_keys': primary_keys,
'types': types,
'client_primary_key': client_primary_key
}
)
@blueprint.route(
'/fetch/types/<int:trans_id>', methods=["GET"], endpoint='fetch_types'
)
@blueprint.route('/fetch/<int:trans_id>', methods=["GET"], endpoint='fetch')
@blueprint.route('/fetch/<int:trans_id>/<int:fetch_all>', methods=["GET"], endpoint='fetch_all')
@login_required
def fetch_pg_types(trans_id):
def fetch(trans_id, fetch_all=None):
result = None
has_more_rows = False
rows_fetched_from = 0
rows_fetched_to = 0
fetch_row_cnt = -1 if fetch_all == 1 else ON_DEMAND_RECORD_COUNT
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = check_transaction_status(trans_id)
if status and conn is not None and session_obj is not None:
status, result = conn.async_fetchmany_2darray(fetch_row_cnt)
if not status:
status = 'Error'
else:
status = 'Success'
res_len = len(result)
if fetch_row_cnt != -1 and res_len == ON_DEMAND_RECORD_COUNT:
has_more_rows = True
if res_len:
rows_fetched_from = trans_obj.get_fetched_row_cnt()
trans_obj.update_fetched_row_cnt(rows_fetched_from + res_len)
rows_fetched_from += 1
rows_fetched_to = trans_obj.get_fetched_row_cnt()
session_obj['command_obj'] = pickle.dumps(trans_obj, -1)
update_session_grid_transaction(trans_id, session_obj)
else:
status = 'NotConnected'
result = error_msg
return make_json_response(
data={
'status': status, 'result': result,
'has_more_rows': has_more_rows,
'rows_fetched_from': rows_fetched_from,
'rows_fetched_to': rows_fetched_to
}
)
def fetch_pg_types(columns_info, trans_obj):
"""
This method is used to fetch the pg types, which is required
to map the data type comes as a result of the query.
Args:
trans_id: unique transaction id
columns_info:
"""
# Check the transaction and connection status
status, error_msg, conn, trans_obj, session_obj = check_transaction_status(trans_id)
if status and conn is not None \
and trans_obj is not None and session_obj is not None:
res = {}
if 'columns_info' in session_obj \
and session_obj['columns_info'] is not None:
# get the default connection as current connection attached to trans id
# holds the cursor which has query result so we cannot use that connection
# to execute another query otherwise we'll lose query result.
oids = [session_obj['columns_info'][col]['type_code'] for col in session_obj['columns_info']]
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(trans_obj.sid)
default_conn = manager.connection(did=trans_obj.did)
if oids:
status, res = conn.execute_dict(
u"""SELECT oid, format_type(oid,null) as typname FROM pg_type WHERE oid IN %s ORDER BY oid;
# Connect to the Server if not connected.
res = []
if not default_conn.connected():
status, msg = default_conn.connect()
if not status:
return status, msg
oids = [columns_info[col]['type_code'] for col in columns_info]
if oids:
status, res = default_conn.execute_dict(
u"""SELECT oid, format_type(oid,null) as typname FROM pg_type WHERE oid IN %s ORDER BY oid;
""", [tuple(oids)])
if status:
# iterate through pg_types and update the type name in session object
for record in res['rows']:
for col in session_obj['columns_info']:
type_obj = session_obj['columns_info'][col]
if type_obj['type_code'] == record['oid']:
type_obj['type_name'] = record['typname']
if not status:
return False, res
update_session_grid_transaction(trans_id, session_obj)
return status, res['rows']
else:
status = False
res = error_msg
return True, []
return make_json_response(data={'status': status, 'result': res})
def generate_client_primary_key_name(columns_info):
temp_key = '__temp_PK'
if not columns_info:
return temp_key
initial_temp_key_len = len(temp_key)
duplicate = False
suffix = 1
while 1:
for col in columns_info:
if col['name'] == temp_key:
duplicate = True
break
if duplicate:
if initial_temp_key_len == len(temp_key):
temp_key += str(suffix)
suffix += 1
else:
temp_key = temp_key[:-1] + str(suffix)
suffix += 1
duplicate = False
else:
break
return temp_key
@blueprint.route(
@@ -659,7 +773,6 @@ def save(trans_id):
Args:
trans_id: unique transaction id
"""
if request.data:
changed_data = json.loads(request.data, encoding='utf-8')
else:
@@ -669,7 +782,6 @@ def save(trans_id):
status, error_msg, conn, trans_obj, session_obj = check_transaction_status(trans_id)
if status and conn is not None \
and trans_obj is not None and session_obj is not None:
setattr(trans_obj, 'columns_info', session_obj['columns_info'])
# If there is no primary key found then return from the function.
if len(session_obj['primary_keys']) <= 0 or len(changed_data) <= 0:
@@ -680,7 +792,22 @@ def save(trans_id):
}
)
status, res, query_res, _rowid = trans_obj.save(changed_data)
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(trans_obj.sid)
default_conn = manager.connection(did=trans_obj.did)
# Connect to the Server if not connected.
if not default_conn.connected():
status, msg = default_conn.connect()
if not status:
return make_json_response(
data={'status': status, 'result': u"{}".format(msg)}
)
status, res, query_res, _rowid = trans_obj.save(
changed_data,
session_obj['columns_info'],
session_obj['client_primary_key'],
default_conn)
else:
status = False
res = error_msg