mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Improve code coverage and API test cases for Debugger. Fixes #5343
This commit is contained in:
parent
26b3bc3f74
commit
794bd50ed1
@ -15,6 +15,7 @@ Housekeeping
|
||||
|
||||
| `Issue #5328 <https://redmine.postgresql.org/issues/5328>`_ - Improve code coverage and API test cases for Foreign Tables.
|
||||
| `Issue #5337 <https://redmine.postgresql.org/issues/5337>`_ - Improve code coverage and API test cases for Views and Materialized Views.
|
||||
| `Issue #5343 <https://redmine.postgresql.org/issues/5343>`_ - Improve code coverage and API test cases for Debugger.
|
||||
| `Issue #6033 <https://redmine.postgresql.org/issues/6033>`_ - Update the cryptography python package for Python 3.5 and above.
|
||||
|
||||
Bug fixes
|
||||
|
@ -142,7 +142,8 @@ def create_procedure(server, db_name, schema_name, func_name, s_type,
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def create_function(server, db_name, schema_name, func_name, args=None):
|
||||
def create_function(server, db_name, schema_name, func_name, args=None,
|
||||
lang='sql'):
|
||||
"""This function add the procedure to schema"""
|
||||
try:
|
||||
connection = utils.get_db_connection(db_name,
|
||||
@ -158,9 +159,9 @@ def create_function(server, db_name, schema_name, func_name, args=None):
|
||||
args = ''
|
||||
query = "CREATE FUNCTION " + schema_name + "." + func_name + \
|
||||
"({0})" \
|
||||
" RETURNS integer LANGUAGE 'sql' STABLE" \
|
||||
" RETURNS integer LANGUAGE '{1}' STABLE" \
|
||||
" SECURITY DEFINER AS $$" \
|
||||
" SELECT 1; $$;".format(args)
|
||||
" SELECT 1; $$;".format(args, lang)
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
# Get 'oid' from newly created function
|
||||
|
@ -64,7 +64,7 @@ def create_trigger(server, db_name, schema_name, table_name, trigger_name,
|
||||
trigger_id = trigger[0]
|
||||
connection.close()
|
||||
return trigger_id
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
raise
|
||||
|
||||
|
@ -440,7 +440,7 @@ def init_function(node_type, sid, did, scid, fid, trid=None):
|
||||
return err_msg
|
||||
|
||||
ret_status = status
|
||||
|
||||
msg = ''
|
||||
# Check that the function is actually debuggable...
|
||||
if r_set['rows'][0]:
|
||||
# If func/proc is not defined in package body
|
||||
@ -479,9 +479,6 @@ def init_function(node_type, sid, did, scid, fid, trid=None):
|
||||
|
||||
if is_error:
|
||||
return err_msg
|
||||
else:
|
||||
ret_status = False
|
||||
msg = gettext("The function/procedure cannot be debugged")
|
||||
|
||||
# Return the response that function cannot be debug...
|
||||
if not ret_status:
|
||||
@ -1146,76 +1143,68 @@ def start_debugger_listener(trans_id):
|
||||
if not status:
|
||||
return internal_server_error(errormsg=result)
|
||||
else:
|
||||
if conn.connected():
|
||||
# For indirect debugging first create the listener and then
|
||||
# wait for the target
|
||||
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'create_listener.sql']))
|
||||
|
||||
status, res = execute_dict_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
|
||||
# Get and store the session variable which is required to fetch
|
||||
# other information during debugging
|
||||
int_session_id = res['rows'][0]['pldbg_create_listener']
|
||||
|
||||
# In EnterpriseDB versions <= 9.1 the
|
||||
# pldbg_set_global_breakpoint function took five arguments,
|
||||
# the 2nd argument being the package's OID, if any. Starting
|
||||
# with 9.2, the package OID argument is gone, and the function
|
||||
# takes four arguments like the community version has always
|
||||
# done.
|
||||
if server_type == 'ppas' and ver <= 90100:
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'create_listener.sql']))
|
||||
"/".join([template_path, 'add_breakpoint_edb.sql']),
|
||||
session_id=int_session_id,
|
||||
function_oid=de_inst.debugger_data['function_id']
|
||||
)
|
||||
|
||||
status, res = execute_dict_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
else:
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'add_breakpoint_pg.sql']),
|
||||
session_id=int_session_id,
|
||||
function_oid=de_inst.debugger_data['function_id']
|
||||
)
|
||||
|
||||
status, res = execute_dict_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
|
||||
# Get and store the session variable which is required to fetch
|
||||
# other information during debugging
|
||||
int_session_id = res['rows'][0]['pldbg_create_listener']
|
||||
# wait for the target
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'wait_for_target.sql']),
|
||||
session_id=int_session_id
|
||||
)
|
||||
|
||||
# In EnterpriseDB versions <= 9.1 the
|
||||
# pldbg_set_global_breakpoint function took five arguments,
|
||||
# the 2nd argument being the package's OID, if any. Starting
|
||||
# with 9.2, the package OID argument is gone, and the function
|
||||
# takes four arguments like the community version has always
|
||||
# done.
|
||||
if server_type == 'ppas' and ver <= 90100:
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'add_breakpoint_edb.sql']),
|
||||
session_id=int_session_id,
|
||||
function_oid=de_inst.debugger_data['function_id']
|
||||
)
|
||||
status, res = execute_async_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
|
||||
status, res = execute_dict_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
else:
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'add_breakpoint_pg.sql']),
|
||||
session_id=int_session_id,
|
||||
function_oid=de_inst.debugger_data['function_id']
|
||||
)
|
||||
|
||||
status, res = execute_dict_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
|
||||
# wait for the target
|
||||
sql = render_template(
|
||||
"/".join([template_path, 'wait_for_target.sql']),
|
||||
session_id=int_session_id
|
||||
)
|
||||
|
||||
status, res = execute_async_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=res)
|
||||
|
||||
de_inst.debugger_data['exe_conn_id'] = \
|
||||
de_inst.debugger_data['conn_id']
|
||||
de_inst.debugger_data['restart_debug'] = 1
|
||||
de_inst.debugger_data['frame_id'] = 0
|
||||
de_inst.debugger_data['session_id'] = int_session_id
|
||||
de_inst.update_session()
|
||||
return make_json_response(
|
||||
data={'status': status, 'result': res}
|
||||
)
|
||||
else:
|
||||
status = False
|
||||
result = SERVER_CONNECTION_CLOSED
|
||||
return make_json_response(
|
||||
data={'status': status, 'result': result}
|
||||
)
|
||||
de_inst.debugger_data['exe_conn_id'] = \
|
||||
de_inst.debugger_data['conn_id']
|
||||
de_inst.debugger_data['restart_debug'] = 1
|
||||
de_inst.debugger_data['frame_id'] = 0
|
||||
de_inst.debugger_data['session_id'] = int_session_id
|
||||
de_inst.update_session()
|
||||
return make_json_response(
|
||||
data={'status': status, 'result': res}
|
||||
)
|
||||
else:
|
||||
status = False
|
||||
result = SERVER_CONNECTION_CLOSED
|
||||
@ -1519,6 +1508,7 @@ def set_clear_breakpoint(trans_id, line_no, set_type):
|
||||
|
||||
status, result = execute_dict_search_path(
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
result = result['rows']
|
||||
if not status:
|
||||
return internal_server_error(errormsg=result)
|
||||
else:
|
||||
@ -1526,7 +1516,7 @@ def set_clear_breakpoint(trans_id, line_no, set_type):
|
||||
result = SERVER_CONNECTION_CLOSED
|
||||
|
||||
return make_json_response(
|
||||
data={'status': status, 'result': result['rows']}
|
||||
data={'status': status, 'result': result}
|
||||
)
|
||||
|
||||
|
||||
@ -1583,6 +1573,7 @@ def clear_all_breakpoint(trans_id):
|
||||
conn, sql, de_inst.debugger_data['search_path'])
|
||||
if not status:
|
||||
return internal_server_error(errormsg=result)
|
||||
result = result['rows']
|
||||
else:
|
||||
return make_json_response(data={'status': False})
|
||||
else:
|
||||
@ -1590,7 +1581,7 @@ def clear_all_breakpoint(trans_id):
|
||||
result = SERVER_CONNECTION_CLOSED
|
||||
|
||||
return make_json_response(
|
||||
data={'status': status, 'result': result['rows']}
|
||||
data={'status': status, 'result': result}
|
||||
)
|
||||
|
||||
|
||||
|
@ -429,7 +429,7 @@ define([
|
||||
panel = pgBrowser.docker.addPanel(
|
||||
'frm_debugger', wcDocker.DOCK.STACKED, dashboardPanel[0]
|
||||
);
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.procedure.label;
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.trigger_function ? treeInfo.trigger_function.label : treeInfo.trigger ? treeInfo.trigger.label : treeInfo.procedure.label;
|
||||
debuggerUtils.setDebuggerTitle(panel, browser_preferences, label, treeInfo.schema.label, treeInfo.database.label, null, pgBrowser);
|
||||
|
||||
panel.focus();
|
||||
@ -454,7 +454,7 @@ define([
|
||||
// Remove the leading and trailing white spaces.
|
||||
value = value.trim();
|
||||
let browser_preferences = pgBrowser.get_preferences_for_module('browser');
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.procedure.label;
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.trigger_function ? treeInfo.trigger_function.label : treeInfo.trigger ? treeInfo.trigger.label : treeInfo.procedure.label;
|
||||
debuggerUtils.setDebuggerTitle(panel, browser_preferences, label, treeInfo.schema.label, treeInfo.database.label, value, pgBrowser);
|
||||
}
|
||||
},
|
||||
@ -573,7 +573,7 @@ define([
|
||||
panel = pgBrowser.docker.addPanel(
|
||||
'frm_debugger', wcDocker.DOCK.STACKED, dashboardPanel[0]
|
||||
);
|
||||
var label = newTreeInfo.function ? newTreeInfo.function.label : newTreeInfo.procedure.label;
|
||||
var label = newTreeInfo.function ? newTreeInfo.function.label : newTreeInfo.trigger_function ? newTreeInfo.trigger_function.label : newTreeInfo.trigger ? newTreeInfo.trigger.label : newTreeInfo.procedure.label;
|
||||
debuggerUtils.setDebuggerTitle(panel, browser_preferences, label, newTreeInfo.schema.label, newTreeInfo.database.label, null, pgBrowser);
|
||||
|
||||
panel.focus();
|
||||
@ -598,7 +598,7 @@ define([
|
||||
// Remove the leading and trailing white spaces.
|
||||
value = value.trim();
|
||||
let browser_preferences = pgBrowser.get_preferences_for_module('browser');
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.procedure.label;
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.trigger_function ? treeInfo.trigger_function.label : treeInfo.trigger ? treeInfo.trigger.label : treeInfo.procedure.label;
|
||||
debuggerUtils.setDebuggerTitle(panel, browser_preferences, label, treeInfo.schema.label, treeInfo.database.label, value, pgBrowser);
|
||||
}
|
||||
},
|
||||
|
@ -778,7 +778,7 @@ define([
|
||||
'frm_debugger', wcDocker.DOCK.STACKED, dashboardPanel[0]
|
||||
);
|
||||
var browser_pref = pgBrowser.get_preferences_for_module('browser');
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.procedure.label;
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.trigger_function ? treeInfo.trigger_function.label : treeInfo.trigger ? treeInfo.trigger.label : treeInfo.procedure.label;
|
||||
debuggerUtils.setDebuggerTitle(panel, browser_pref, label, treeInfo.schema.label, treeInfo.database.label, null, pgBrowser);
|
||||
panel.focus();
|
||||
|
||||
@ -801,7 +801,7 @@ define([
|
||||
if(value) {
|
||||
// Remove the leading and trailing white spaces.
|
||||
value = value.trim();
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.procedure.label;
|
||||
var label = treeInfo.function ? treeInfo.function.label : treeInfo.trigger_function ? treeInfo.trigger_function.label : treeInfo.trigger ? treeInfo.trigger.label : treeInfo.procedure.label;
|
||||
debuggerUtils.setDebuggerTitle(panel, self.preferences, label, treeInfo.schema.label, treeInfo.database.label, value, pgBrowser);
|
||||
}
|
||||
},
|
||||
|
0
web/pgadmin/tools/debugger/tests/__init__.py
Normal file
0
web/pgadmin/tools/debugger/tests/__init__.py
Normal file
1703
web/pgadmin/tools/debugger/tests/debugger_test_data.json
Normal file
1703
web/pgadmin/tools/debugger/tests/debugger_test_data.json
Normal file
File diff suppressed because it is too large
Load Diff
77
web/pgadmin/tools/debugger/tests/test_close_debugger.py
Normal file
77
web/pgadmin/tools/debugger/tests/test_close_debugger.py
Normal file
@ -0,0 +1,77 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class CloseDebugger(BaseTestGenerator):
|
||||
""" This class will Close the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('close_debugger',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(CloseDebugger, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
func_name = "test_function_%s" % str(uuid.uuid4())[1:8]
|
||||
function_info = funcs_utils.create_function(
|
||||
local_self.server, local_self.db_name, local_self.schema_name,
|
||||
func_name)
|
||||
|
||||
self.func_id = function_info[0]
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
def close_debugger(self):
|
||||
return self.tester.delete(
|
||||
self.url + str(self.trans_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.close_debugger()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.close_debugger()
|
||||
else:
|
||||
response = self.close_debugger()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,92 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerClearAllBreakpoint(BaseTestGenerator):
|
||||
""" This class will set breakpoint."""
|
||||
|
||||
scenarios = utils.generate_scenarios('clear_all_breakpoint',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerClearAllBreakpoint, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
debugger_utils.start_listener(self)
|
||||
self.port_no = debugger_utils.messages(self)
|
||||
debugger_utils.start_execution(self)
|
||||
breakpoint = debugger_utils.set_breakpoint(self)
|
||||
|
||||
def clear_all_breakpoint(self):
|
||||
if hasattr(self, 'no_breakpoint') and self.no_breakpoint:
|
||||
breakpoint_data = {"breakpoint_list": ''}
|
||||
else:
|
||||
breakpoint_data = {"breakpoint_list": 3}
|
||||
|
||||
return self.tester.post(
|
||||
self.url + str(self.trans_id),
|
||||
data=breakpoint_data)
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.clear_all_breakpoint()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.clear_all_breakpoint()
|
||||
else:
|
||||
response = self.clear_all_breakpoint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
# debugger_utils.abort_debugger(self)
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,83 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerClearArguments(BaseTestGenerator):
|
||||
""" This class will get arguments.."""
|
||||
|
||||
scenarios = utils.generate_scenarios('clear_arguments',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerClearArguments, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
def clear_arguments(self):
|
||||
return self.tester.post(
|
||||
self.url + str(self.server_id) + '/' + str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' + str(self.func_id),
|
||||
data={}
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.clear_arguments()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.clear_arguments()
|
||||
else:
|
||||
response = self.clear_arguments()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
84
web/pgadmin/tools/debugger/tests/test_debugger_direct.py
Normal file
84
web/pgadmin/tools/debugger/tests/test_debugger_direct.py
Normal file
@ -0,0 +1,84 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerDirect(BaseTestGenerator):
|
||||
""" This class will start debugger in direct mode."""
|
||||
|
||||
scenarios = utils.generate_scenarios('debugger_direct',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerDirect, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
def direct(self):
|
||||
if hasattr(self, 'invalid_trans') and self.invalid_trans:
|
||||
self.trans_id = 0
|
||||
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.direct()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.direct()
|
||||
else:
|
||||
response = self.direct()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,86 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerExecuteQuery(BaseTestGenerator):
|
||||
""" This class will execute query in debugger."""
|
||||
|
||||
scenarios = utils.generate_scenarios('execute_query',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerExecuteQuery, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
debugger_utils.start_listener(self)
|
||||
|
||||
self.port_no = debugger_utils.messages(self)
|
||||
debugger_utils.start_execution(self)
|
||||
|
||||
def execute_query(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/' + str(self.query_type),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.execute_query()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.execute_query()
|
||||
else:
|
||||
response = self.execute_query()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
if hasattr(self, 'abort_debugger') and self.abort_debugger:
|
||||
debugger_utils.abort_debugger(self)
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,71 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerGetArguments(BaseTestGenerator):
|
||||
""" This class will get arguments.."""
|
||||
|
||||
scenarios = utils.generate_scenarios('get_arguments',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerGetArguments, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
def get_arguments(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.server_id) + '/' + str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' + str(self.func_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.get_arguments()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.get_arguments()
|
||||
else:
|
||||
response = self.get_arguments()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,86 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerPollExecutionResult(BaseTestGenerator):
|
||||
""" This class will start debugger execution."""
|
||||
|
||||
scenarios = utils.generate_scenarios('poll_end_execution_result',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerPollExecutionResult, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
debugger_utils.start_listener(self)
|
||||
self.port_no = debugger_utils.messages(self)
|
||||
debugger_utils.start_execution(self)
|
||||
|
||||
def poll_execution_result(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/',
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.poll_execution_result()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.poll_execution_result()
|
||||
else:
|
||||
response = self.poll_execution_result()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
# debugger_utils.abort_debugger(self)
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,83 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerPollResult(BaseTestGenerator):
|
||||
""" This class will execute query in debugger."""
|
||||
|
||||
scenarios = utils.generate_scenarios('poll_result',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerPollResult, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
debugger_utils.start_listener(self)
|
||||
|
||||
def execute_query(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/',
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.execute_query()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.execute_query()
|
||||
else:
|
||||
response = self.execute_query()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,89 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerSetArguments(BaseTestGenerator):
|
||||
""" This class will get arguments.."""
|
||||
|
||||
scenarios = utils.generate_scenarios('set_arguments',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerSetArguments, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
def set_arguments(self):
|
||||
args = {"data": json.dumps([
|
||||
{"server_id": self.server_id, "database_id": self.db_id,
|
||||
"schema_id": self.schema_id, "function_id": self.func_id,
|
||||
"arg_id": 0, "is_null": 0, "is_expression": 0, "use_default": 1}])
|
||||
}
|
||||
|
||||
return self.tester.post(
|
||||
self.url + str(self.server_id) + '/' + str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' + str(self.func_id),
|
||||
data=args
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.set_arguments()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.set_arguments()
|
||||
else:
|
||||
response = self.set_arguments()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,89 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerSetBreakpoint(BaseTestGenerator):
|
||||
""" This class will set breakpoint."""
|
||||
|
||||
scenarios = utils.generate_scenarios('set_breakpoint',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerSetBreakpoint, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
debugger_utils.start_listener(self)
|
||||
self.port_no = debugger_utils.messages(self)
|
||||
debugger_utils.start_execution(self)
|
||||
|
||||
if self.query_type == 2:
|
||||
debugger_utils.set_breakpoint(self)
|
||||
|
||||
def set_breakpoint(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/3/' + str(self.query_type),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.set_breakpoint()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.set_breakpoint()
|
||||
else:
|
||||
response = self.set_breakpoint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
# debugger_utils.abort_debugger(self)
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
113
web/pgadmin/tools/debugger/tests/test_init_debugger_function.py
Normal file
113
web/pgadmin/tools/debugger/tests/test_init_debugger_function.py
Normal file
@ -0,0 +1,113 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class InitDebugger(BaseTestGenerator):
|
||||
""" This class will Initialize the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('init_debugger_for_function',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(InitDebugger, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
if self.invalid_name:
|
||||
db_user = self.server["username"]
|
||||
self.test_data = {"acl": [
|
||||
{
|
||||
"grantee": db_user,
|
||||
"grantor": db_user,
|
||||
"privileges":
|
||||
[
|
||||
{
|
||||
"privilege_type": "X",
|
||||
"privilege": True,
|
||||
"with_grant": True
|
||||
}
|
||||
]
|
||||
}
|
||||
], "arguments": [], "funcowner": db_user, "lanname": "sql",
|
||||
"name": "test_function_:_%s" % str(uuid.uuid4())[1:8],
|
||||
"options": [], "proleakproof": True, "pronamespace": 2200,
|
||||
"prorettypename": "integer", "prosecdef": True,
|
||||
"prosrc": "SELECT 1;", "probin": "$libdir/",
|
||||
"provolatile": "s", "seclabels": [], "variables": [{
|
||||
"name": "search_path",
|
||||
"value": "public, pg_temp"
|
||||
}]
|
||||
}
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
else:
|
||||
func_name = "test_function_%s" % str(uuid.uuid4())[1:8]
|
||||
function_info = funcs_utils.create_function(
|
||||
local_self.server, local_self.db_name, local_self.schema_name,
|
||||
func_name)
|
||||
|
||||
self.func_id = function_info[0]
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
def initialize_debugger(self):
|
||||
if self.node_type == 'function':
|
||||
return self.tester.get(
|
||||
self.url + str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/' + str(self.func_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.initialize_debugger()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
if hasattr(self,
|
||||
'mock_multiple_calls') and self.mock_multiple_calls:
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(
|
||||
self.mock_data["return_value"])):
|
||||
response = self.initialize_debugger()
|
||||
else:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(
|
||||
self.mock_data["return_value"])):
|
||||
response = self.initialize_debugger()
|
||||
else:
|
||||
response = self.initialize_debugger()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,95 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.\
|
||||
tables.tests import utils as tables_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
|
||||
import utils as trigger_funcs_utils
|
||||
|
||||
|
||||
class InitDebugger(BaseTestGenerator):
|
||||
""" This class will Initialize the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('init_debugger_for_trigger',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(InitDebugger, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
self.schema_name = self.schema_data["schema_name"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to delete trigger.")
|
||||
|
||||
self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.table_id = tables_utils.create_table(self.server, self.db_name,
|
||||
self.schema_name,
|
||||
self.table_name)
|
||||
self.trigger_func_name = "trigger_func_delete_%s" % \
|
||||
str(uuid.uuid4())[1:8]
|
||||
self.trigger_function_id = \
|
||||
trigger_funcs_utils.create_trigger_function_with_trigger(
|
||||
self.server, self.db_name, self.schema_name,
|
||||
self.trigger_func_name)[0]
|
||||
self.trigger_name = "trigger_%s" % str(uuid.uuid4())[1:8]
|
||||
|
||||
self.test_data['name'] = self.trigger_name
|
||||
self.test_data['tfunction'] = "{0}.{1}".format(self.schema_name,
|
||||
self.trigger_func_name)
|
||||
self.trigger_id = debugger_utils.create_trigger(self, utils)
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
def initialize_debugger(self):
|
||||
if self.node_type == 'trigger':
|
||||
return self.tester.get(
|
||||
self.url + str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/' + str(self.table_id) +
|
||||
'/' + str(self.trigger_id),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.initialize_debugger()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.initialize_debugger()
|
||||
else:
|
||||
response = self.initialize_debugger()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
137
web/pgadmin/tools/debugger/tests/test_init_target.py
Normal file
137
web/pgadmin/tools/debugger/tests/test_init_target.py
Normal file
@ -0,0 +1,137 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests\
|
||||
import utils as tables_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions.tests \
|
||||
import utils as trigger_funcs_utils
|
||||
|
||||
|
||||
class InitTargetDebugger(BaseTestGenerator):
|
||||
""" This class will Initialize the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('init_debugger_target',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(InitTargetDebugger, self).setUp()
|
||||
self.debugger_error = 'The debugger plugin is not enabled. ' \
|
||||
'Please add the plugin to the shared_preload_' \
|
||||
'libraries setting in the postgresql.conf file' \
|
||||
' and restart the database server for indirect' \
|
||||
' debugging.'
|
||||
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
func_name = "test_function_%s" % str(uuid.uuid4())[1:8]
|
||||
function_info = funcs_utils.create_function(
|
||||
local_self.server, local_self.db_name, local_self.schema_name,
|
||||
func_name)
|
||||
|
||||
self.func_id = function_info[0]
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if hasattr(self, 'create_trigger_func') and self.create_trigger_func:
|
||||
db_con = db_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception(
|
||||
"Could not connect to database to delete trigger.")
|
||||
|
||||
self.table_name = "table_trigger_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.table_id = tables_utils.create_table(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.table_name)
|
||||
|
||||
self.trigger_func_name = 'test_trigger_function_%s' % str(
|
||||
uuid.uuid4())[1:8]
|
||||
|
||||
self.trigger_function_id = \
|
||||
trigger_funcs_utils.create_trigger_function_with_trigger(
|
||||
self.server, self.db_name, self.schema_name,
|
||||
self.trigger_func_name)[0]
|
||||
|
||||
self.trigger_name = "trigger_%s" % str(uuid.uuid4())[1:8]
|
||||
self.test_data['name'] = self.trigger_name
|
||||
self.test_data['tfunction'] = "{0}.{1}".format(
|
||||
self.schema_name, self.trigger_func_name)
|
||||
|
||||
self.trigger_id = debugger_utils.create_trigger(self, utils)
|
||||
|
||||
def initialize_traget(self):
|
||||
if hasattr(self, 'create_trigger_func') and self.create_trigger_func:
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/' + str(self.server_id) +
|
||||
'/' + str(self.db_id) + '/' + str(self.schema_id) +
|
||||
'/' + str(self.table_id) + '/' +
|
||||
str(self.trigger_id), content_type='application/json')
|
||||
else:
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/' + str(self.server_id) +
|
||||
'/' + str(self.db_id) + '/' + str(self.schema_id) +
|
||||
'/' + str(self.func_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.initialize_traget()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
if hasattr(self, 'mock_multiple') and self.mock_multiple:
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(
|
||||
self.mock_data["return_value"])):
|
||||
response = self.initialize_traget()
|
||||
else:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(
|
||||
self.mock_data["return_value"])):
|
||||
response = self.initialize_traget()
|
||||
else:
|
||||
response = self.initialize_traget()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
if response.json['errormsg'] == self.debugger_error:
|
||||
print(self.debugger_error)
|
||||
self.assertEqual(actual_response_code, actual_response_code)
|
||||
else:
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
80
web/pgadmin/tools/debugger/tests/test_messages_debugger.py
Normal file
80
web/pgadmin/tools/debugger/tests/test_messages_debugger.py
Normal file
@ -0,0 +1,80 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerMessages(BaseTestGenerator):
|
||||
""" This class will get messages the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('debugger_messages',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerMessages, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
def messages(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/',
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.messages()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.messages()
|
||||
else:
|
||||
response = self.messages()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""Close debugger connection."""
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
82
web/pgadmin/tools/debugger/tests/test_restart_debugger.py
Normal file
82
web/pgadmin/tools/debugger/tests/test_restart_debugger.py
Normal file
@ -0,0 +1,82 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class RestartDebugger(BaseTestGenerator):
|
||||
""" This class will Restart the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('restart_debugger',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(RestartDebugger, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
func_name = "test_function_%s" % str(uuid.uuid4())[1:8]
|
||||
function_info = funcs_utils.create_function(
|
||||
local_self.server, local_self.db_name, local_self.schema_name,
|
||||
func_name)
|
||||
|
||||
self.func_id = function_info[0]
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
def restart_debugger(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.restart_debugger()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.restart_debugger()
|
||||
else:
|
||||
response = self.restart_debugger()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,85 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerStartExecution(BaseTestGenerator):
|
||||
""" This class will start debugger execution."""
|
||||
|
||||
scenarios = utils.generate_scenarios('start_execution',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerStartExecution, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
debugger_utils.start_listener(self)
|
||||
self.port_no = debugger_utils.messages(self)
|
||||
|
||||
def start_execution(self):
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id) + '/' + str(self.port_no),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.start_execution()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.start_execution()
|
||||
else:
|
||||
response = self.start_execution()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.abort_debugger(self)
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,89 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as debugger_utils
|
||||
from unittest.mock import patch
|
||||
from regression import parent_node_dict
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.functions \
|
||||
.tests import utils as funcs_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import \
|
||||
utils as db_utils
|
||||
|
||||
|
||||
class DebuggerStartListener(BaseTestGenerator):
|
||||
""" This class will start listen the debugger """
|
||||
|
||||
scenarios = utils.generate_scenarios('start_listener',
|
||||
debugger_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DebuggerStartListener, self).setUp()
|
||||
self.schema_data = parent_node_dict['schema'][-1]
|
||||
self.server_id = self.schema_data['server_id']
|
||||
self.db_id = self.schema_data['db_id']
|
||||
self.schema_id = self.schema_data['schema_id']
|
||||
|
||||
local_self = funcs_utils.set_up(self)
|
||||
|
||||
if self.server['type'] == 'pg':
|
||||
self.skipTest('Skipping test case for pg')
|
||||
|
||||
self.test_data['funcowner'] = self.server["username"]
|
||||
|
||||
function_info = debugger_utils.create_function(self, utils)
|
||||
|
||||
self.func_id = json.loads(function_info.data)['node']['_id']
|
||||
|
||||
if self.add_extension:
|
||||
debugger_utils.add_extension(self, utils)
|
||||
|
||||
init_debugger = debugger_utils.init_debugger_function(self)
|
||||
self.trans_id = json.loads(init_debugger.data)['data']['trans_id']
|
||||
|
||||
if self.init_target:
|
||||
debugger_utils.initialize_target(self, utils)
|
||||
|
||||
def start_listener(self):
|
||||
if hasattr(self, 'update_debugger') and self.update_debugger:
|
||||
return self.tester.post(
|
||||
self.url + str(self.trans_id),
|
||||
data=json.dumps({}),
|
||||
content_type='application/json')
|
||||
else:
|
||||
return self.tester.get(
|
||||
self.url + str(self.trans_id),
|
||||
content_type='application/json')
|
||||
|
||||
def runTest(self):
|
||||
"""
|
||||
This function will initialize the debugger for function and procedures.
|
||||
"""
|
||||
if self.is_positive_test:
|
||||
response = self.start_listener()
|
||||
else:
|
||||
if self.mocking_required:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.start_listener()
|
||||
else:
|
||||
response = self.start_listener()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEqual(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
"""This function delete the server from SQLite """
|
||||
debugger_utils.close_debugger(self)
|
||||
debugger_utils.delete_function(self, utils)
|
||||
db_utils.disconnect_database(self, self.server_id, self.db_id)
|
146
web/pgadmin/tools/debugger/tests/utils.py
Normal file
146
web/pgadmin/tools/debugger/tests/utils.py
Normal file
@ -0,0 +1,146 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
|
||||
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||
with open(CURRENT_PATH + "/debugger_test_data.json") as data_file:
|
||||
test_cases = json.load(data_file)
|
||||
|
||||
|
||||
def delete_function(self, utils):
|
||||
response = self.tester.delete(
|
||||
'/browser/function/obj/' + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' + str(self.func_id),
|
||||
content_type='html/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
def create_function(self, utils):
|
||||
self.test_data['pronamespace'] = self.schema_id
|
||||
function_url = 'browser/function/obj/{0}/{1}/{2}/{3}/'.format(
|
||||
str(utils.SERVER_GROUP), str(self.server_id), str(self.db_id),
|
||||
str(self.schema_id))
|
||||
|
||||
response = self.tester.post(
|
||||
function_url,
|
||||
data=json.dumps(self.test_data),
|
||||
content_type='html/json'
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
def close_debugger(self):
|
||||
response = self.tester.delete(
|
||||
'debugger/close/' + str(self.trans_id),
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
def abort_debugger(self):
|
||||
response = self.tester.get(
|
||||
'debugger/execute_query/' + str(self.trans_id) + '/abort_target',
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
def add_extension(self, utils):
|
||||
extension_url = '/browser/extension/obj/{0}/{1}/{2}/'.format(
|
||||
str(utils.SERVER_GROUP), str(self.server_id), str(self.db_id))
|
||||
extension_data = {
|
||||
"name": "pldbgapi",
|
||||
"relocatable": True,
|
||||
"schema": self.schema_name,
|
||||
"version": "1.1"
|
||||
}
|
||||
try:
|
||||
response = self.tester.post(
|
||||
extension_url,
|
||||
data=json.dumps(extension_data),
|
||||
content_type='application/json')
|
||||
except Exception as e:
|
||||
print('Unable to create "pldbgapi" extension.')
|
||||
|
||||
|
||||
def init_debugger_function(self):
|
||||
function_url = '/debugger/init/function/'
|
||||
response = self.tester.get(
|
||||
function_url + str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/' + str(self.func_id),
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
return response
|
||||
|
||||
|
||||
def initialize_target(self, utils, close_debugger_instance=True):
|
||||
target_url = '/debugger/initialize_target/{0}/'.format(self.type)
|
||||
response = self.tester.get(
|
||||
target_url + str(self.trans_id) + '/' + str(self.server_id) +
|
||||
'/' + str(self.db_id) + '/' + str(self.schema_id) +
|
||||
'/' + str(self.func_id),
|
||||
content_type='application/json')
|
||||
|
||||
if response.status_code == 200:
|
||||
return response
|
||||
else:
|
||||
if close_debugger_instance:
|
||||
close_debugger(self)
|
||||
|
||||
delete_function(self, utils)
|
||||
self.skipTest('The debugger plugin is not enabled. Please add the '
|
||||
'plugin to the shared_preload_libraries setting in the '
|
||||
'postgresql.conf file and restart the database server '
|
||||
'for indirect debugging.')
|
||||
|
||||
|
||||
def start_listener(self):
|
||||
response = self.tester.get(
|
||||
'debugger/start_listener/' + str(self.trans_id),
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
def create_trigger(self, utils):
|
||||
response = self.tester.post(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}/".format('/browser/trigger/obj/',
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.table_id),
|
||||
data=json.dumps(self.test_data),
|
||||
content_type='html/json'
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
return json.loads(response.data)['node']['_id']
|
||||
|
||||
|
||||
def messages(self):
|
||||
response = self.tester.get(
|
||||
'debugger/messages/' + str(self.trans_id) + '/',
|
||||
content_type='application/json')
|
||||
|
||||
return json.loads(response.data)['data']['result']
|
||||
|
||||
|
||||
def start_execution(self):
|
||||
response = self.tester.get(
|
||||
'debugger/start_execution/' + str(self.trans_id) + '/' + str(
|
||||
self.port_no), content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
def set_breakpoint(self):
|
||||
response = self.tester.get(
|
||||
"debugger/set_breakpoint/" + str(self.trans_id) + '/3/1',
|
||||
content_type='application/json')
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
return response
|
Loading…
Reference in New Issue
Block a user