mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Fixed EXEC script for procedures. Fixes #3850
This commit is contained in:
parent
1f29859040
commit
406ada10f5
@ -27,6 +27,7 @@ Bug fixes
|
||||
| `Bug #3840 <https://redmine.postgresql.org/issues/3840>`_ - Ensure that file format combo box value should be retained when hidden files checkbox is toggled.
|
||||
| `Bug #3846 <https://redmine.postgresql.org/issues/3846>`_ - Proper SQL should be generated when create procedure with custom type arguments.
|
||||
| `Bug #3849 <https://redmine.postgresql.org/issues/3849>`_ - Ensure that browser should warn before close or refresh.
|
||||
| `Bug #3850 <https://redmine.postgresql.org/issues/3850>`_ - Fixed EXEC script for procedures.
|
||||
| `Bug #3853 <https://redmine.postgresql.org/issues/3853>`_ - Proper SQL should be generated when create domain of type interval with precision.
|
||||
| `Bug #3858 <https://redmine.postgresql.org/issues/3858>`_ - Drop-down should be closed when click on any other toolbar button.
|
||||
| `Bug #3862 <https://redmine.postgresql.org/issues/3862>`_ - Fixed keyboard navigation for dialog tabs.
|
||||
|
@ -1508,16 +1508,20 @@ class FunctionView(PGChildNodeView, DataTypeReader):
|
||||
name = resp_data['pronamespace'] + "." + resp_data['name_with_args']
|
||||
|
||||
# Fetch only arguments
|
||||
args = name[name.rfind('('):].strip('(').strip(')').split(',')
|
||||
# Remove unwanted spaces from arguments
|
||||
args = [arg.strip(' ') for arg in args]
|
||||
if name.rfind('(') != -1:
|
||||
args = name[name.rfind('('):].strip('(').strip(')').split(',')
|
||||
# Remove unwanted spaces from arguments
|
||||
args = [arg.strip(' ') for arg in args]
|
||||
|
||||
# Remove duplicate and then format arguments
|
||||
for arg in list(set(args)):
|
||||
formatted_arg = '\n\t<' + arg + '>'
|
||||
name = name.replace(arg, formatted_arg)
|
||||
# Remove duplicate and then format arguments
|
||||
for arg in list(set(args)):
|
||||
formatted_arg = '\n\t<' + arg + '>'
|
||||
name = name.replace(arg, formatted_arg)
|
||||
|
||||
name = name.replace(')', '\n)')
|
||||
else:
|
||||
name += '()'
|
||||
|
||||
name = name.replace(')', '\n)')
|
||||
if self.manager.server_type == 'pg':
|
||||
sql = "CALL {0}".format(name)
|
||||
else:
|
||||
|
@ -0,0 +1,81 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
import re
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as funcs_utils
|
||||
|
||||
|
||||
class ProcedureExecSQLTestCase(BaseTestGenerator):
|
||||
""" This class will check the EXEC SQL for Procedure. """
|
||||
skip_on_database = ['gpdb']
|
||||
scenarios = [
|
||||
# Fetching procedure SQL to EXEC.
|
||||
('Fetch Procedure SQL to execute', dict(
|
||||
url='/browser/procedure/exec_sql/', with_args=False, args="",
|
||||
expected_sql="{0} {1}.{2}()")),
|
||||
('Fetch Procedure with arguuments SQL to execute', dict(
|
||||
url='/browser/procedure/exec_sql/', with_args=True,
|
||||
args="arg1 bigint",
|
||||
expected_sql="{0} {1}.{2}( <arg1 bigint> )"))
|
||||
]
|
||||
|
||||
def runTest(self):
|
||||
""" This function will check the EXEC SQL. """
|
||||
super(ProcedureExecSQLTestCase, self).setUp()
|
||||
self = funcs_utils.set_up(self)
|
||||
|
||||
if self.server_type == "pg" and\
|
||||
self.server_version < 110000:
|
||||
message = "Procedures are not supported by PG < 110000."
|
||||
self.skipTest(message)
|
||||
|
||||
proc_name = "test_procedure_exec_sql_%s" % str(uuid.uuid4())[1:8]
|
||||
proc_info = funcs_utils.create_procedure(
|
||||
self.server, self.db_name, self.schema_name, proc_name,
|
||||
self.server_type, self.server_version, self.with_args, self.args)
|
||||
|
||||
proc_id = proc_info[0]
|
||||
|
||||
exec_response = self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) +
|
||||
'/' + str(self.server_id) + '/' + str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(proc_id))
|
||||
self.assertEquals(exec_response.status_code, 200)
|
||||
exec_sql = json.loads(exec_response.data.decode('utf-8'))
|
||||
|
||||
# Replace multiple spaces with one space and check the expected sql
|
||||
sql = re.sub('\s+', ' ', exec_sql).strip()
|
||||
|
||||
# Verify the expected EXEC SQL
|
||||
if self.server_type == "pg":
|
||||
expected_sql = self.expected_sql.format("CALL", self.schema_name,
|
||||
proc_name)
|
||||
else:
|
||||
expected_sql = self.expected_sql.format("EXEC", self.schema_name,
|
||||
proc_name)
|
||||
|
||||
self.assertEquals(sql, expected_sql)
|
||||
|
||||
# Verify the EXEC SQL by running it if we don't have arguments
|
||||
if not self.with_args:
|
||||
funcs_utils.execute_procedure(self.server, self.db_name, exec_sql)
|
||||
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
@ -100,7 +100,7 @@ def verify_trigger_function(server, db_name, func_name):
|
||||
|
||||
|
||||
def create_procedure(server, db_name, schema_name, func_name, s_type,
|
||||
s_version):
|
||||
s_version, with_args=False, args=""):
|
||||
"""This function add the procedure to schema"""
|
||||
try:
|
||||
connection = utils.get_db_connection(db_name,
|
||||
@ -111,22 +111,24 @@ def create_procedure(server, db_name, schema_name, func_name, s_type,
|
||||
server['sslmode'])
|
||||
pg_cursor = connection.cursor()
|
||||
if s_type == 'pg':
|
||||
query = "CREATE PROCEDURE " + schema_name + "." + func_name + \
|
||||
"()" \
|
||||
" LANGUAGE 'sql'" \
|
||||
" SECURITY DEFINER AS $$" \
|
||||
" SELECT 1; $$;"
|
||||
query = "CREATE PROCEDURE {0}.{1}" \
|
||||
"({2})" \
|
||||
" LANGUAGE 'sql'" \
|
||||
" SECURITY DEFINER AS $$" \
|
||||
" SELECT 1; $$;".format(schema_name, func_name, args)
|
||||
else:
|
||||
if s_version >= 90500:
|
||||
query = "CREATE PROCEDURE " + schema_name + "." + func_name + \
|
||||
"()" \
|
||||
query = "CREATE PROCEDURE {0}.{1}" \
|
||||
"({2})" \
|
||||
" SECURITY DEFINER AS $BODY$ BEGIN" \
|
||||
" NULL; END; $BODY$"
|
||||
" NULL; END; $BODY$".format(schema_name, func_name,
|
||||
args)
|
||||
else:
|
||||
query = "CREATE PROCEDURE " + schema_name + "." + func_name + \
|
||||
"()" \
|
||||
query = "CREATE PROCEDURE {0}.{1}" \
|
||||
"({2})" \
|
||||
" AS $BODY$ BEGIN" \
|
||||
" NULL; END; $BODY$"
|
||||
" NULL; END; $BODY$".format(schema_name, func_name,
|
||||
args)
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
@ -220,3 +222,16 @@ def set_up(obj):
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
return obj
|
||||
|
||||
|
||||
def execute_procedure(server, db_name, proc_exec_sql):
|
||||
"""This function verifies the procedure in db"""
|
||||
connection = utils.get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'],
|
||||
server['sslmode'])
|
||||
pg_cursor = connection.cursor()
|
||||
pg_cursor.execute(proc_exec_sql)
|
||||
connection.close()
|
||||
|
Loading…
Reference in New Issue
Block a user