pgadmin4/web/pgadmin/tools/search_objects/utils.py
Akshay Joshi b36004b702 The following are the initial fixes for PG15:
1) From PG 15 onward, the datlastsysoid has been removed from the table pg_database.
    We have added the constant _DATABASE_LAST_SYSTEM_OID = 16383, all the objects below
    this value are considered to be system objects. Modified the pgAdmin logic accordingly.

 2) Concatenation operator '||' needs a specific typecast to be applied to query variables. Modified SQL's accordingly.

Fixes #7283
2022-05-17 20:32:17 +05:30

153 lines
5.5 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2022, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
from flask import current_app, render_template
from flask_babel import gettext
from pgadmin.utils.driver import get_driver
from config import PG_DEFAULT_DRIVER
from pgadmin.utils.constants import DATABASE_LAST_SYSTEM_OID
def get_node_blueprint(node_type):
blueprint = None
node_type = 'NODE-' + node_type
if node_type in current_app.blueprints:
blueprint = current_app.blueprints[node_type]
return blueprint
class SearchObjectsHelper:
def __init__(self, sid, did, show_system_objects=False, node_types=None):
self.sid = sid
self.did = did
self.show_system_objects = show_system_objects
self.manager = get_driver(
PG_DEFAULT_DRIVER
).connection_manager(sid)
self._all_node_types = [
'cast', 'fts_dictionary', 'check_constraint',
'exclusion_constraint', 'foreign_key',
'primary_key', 'unique_constraint', 'constraints', 'trigger',
'table', 'compound_trigger', 'rule', 'column', 'partition',
'index', 'type', 'domain', 'domain_constraints', 'schema',
'synonym', 'sequence', 'edbvar', 'edbfunc', 'edbproc', 'package',
'foreign_table', 'fts_parser', 'function', 'procedure',
'trigger_function', 'fts_template', 'collation', 'view', 'mview',
'fts_configuration', 'extension', 'language',
'event_trigger', 'foreign_server', 'user_mapping',
'foreign_data_wrapper', 'row_security_policy',
'publication', 'subscription', 'aggregate', 'operator'
] if node_types is None else node_types
@property
def all_node_types(self):
return self._all_node_types
def get_template_path(self):
return 'search_objects/sql/{0}/#{1}#'.format(
self.manager.server_type, self.manager.version)
def get_show_node_prefs(self):
return_types = {}
for node_type in self.all_node_types:
blueprint = get_node_blueprint(node_type)
if blueprint is None:
continue
return_types[node_type] = blueprint.show_node
return return_types
def get_supported_types(self, skip_check=False):
return_types = {}
for node_type in self.all_node_types:
blueprint = get_node_blueprint(node_type)
if blueprint is None:
continue
if blueprint.backend_supported(self.manager, is_catalog=False,
did=self.did) or skip_check:
if node_type in ['edbfunc', 'edbproc']:
return_types[node_type] =\
gettext('Package {0}').format(
blueprint.collection_label)
else:
return_types[node_type] = blueprint.collection_label
return return_types
def get_sql(self, sql_file, **kwargs):
return render_template(
"/".join([self.get_template_path(), sql_file]),
**kwargs
)
def _check_permission(self, obj_type, conn, skip_obj_type):
"""
This function return whether user has permission to see type
:param obj_type:
:param conn:
:return:
"""
if obj_type == 'all':
status, result = conn.execute_dict(
"SELECT COUNT(1) FROM information_schema.table_privileges "
"WHERE table_name = 'pg_subscription' "
"AND privilege_type = 'SELECT'")
if 'count' in result['rows'][0] and \
result['rows'][0]['count'] == '0':
skip_obj_type.append('subscription')
return skip_obj_type
def search(self, text, obj_type=None):
skip_obj_type = []
conn = self.manager.connection(did=self.did)
last_system_oid = DATABASE_LAST_SYSTEM_OID
show_node_prefs = self.get_show_node_prefs()
node_labels = self.get_supported_types(skip_check=True)
# escape the single quote from search text
text = text.replace("'", "''")
skip_obj_type = self._check_permission(obj_type, conn,
skip_obj_type)
# Column catalog_level has values as
# N - Not a catalog schema
# D - Catalog schema with DB support - pg_catalog
# O - Catalog schema with object support only - info schema, dbo, sys
status, res = conn.execute_dict(
self.get_sql('search.sql',
search_text=text.lower(), obj_type=obj_type,
show_system_objects=self.show_system_objects,
show_node_prefs=show_node_prefs, _=gettext,
last_system_oid=last_system_oid,
skip_obj_type=skip_obj_type)
)
if not status:
return status, res
ret_val = [
{
'name': row['obj_name'],
'type': row['obj_type'],
'type_label': node_labels[row['obj_type']],
'path': row['obj_path'],
'show_node': row['show_node'],
'other_info': row['other_info'],
'catalog_level': row['catalog_level'],
}
for row in res['rows']
]
return True, ret_val