Improve code coverage and API test cases for the Event Trigger module. Fixes #5088

This commit is contained in:
Pradip Parkale
2020-02-14 11:19:34 +05:30
committed by Akshay Joshi
parent 019932c323
commit e9f16a29cf
14 changed files with 1292 additions and 71 deletions

View File

@@ -676,7 +676,9 @@ class EventTriggerView(PGChildNodeView):
if len(res['rows']) == 0:
return gone(
_("Could not find the specified event trigger on the server.")
gettext(
"Could not find the specified event trigger on the "
"server.")
)
result = res['rows'][0]

View File

@@ -0,0 +1,532 @@
{
"create_event_trigger": [
{
"name": "Create DDL_COMMAND_END Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "ddl_command_start",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER"
},
"parameters_to_compare": [
"enabled",
"eventname",
"eventowner",
"name"
],
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Create SQL_DROP Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "ddl_command_start",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER"
},
"parameters_to_compare": [
"enabled",
"eventname",
"eventowner",
"name"
],
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Create DDL_COMMAND_START Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "ddl_command_start",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER"
},
"parameters_to_compare": [
"enabled",
"eventname",
"eventowner",
"name"
],
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while creating a Event Trigger",
"url": "/browser/event_trigger/obj/",
"error_creating_event_trigger": true,
"is_positive_test": false,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "DDL_COMMAND_END",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER",
"providers": []
},
"parameters_to_compare": [
],
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False, 'Mocked Internal Server Error while creating a event trigger')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Internal server error",
"url": "/browser/event_trigger/obj/",
"internal_server_error": true,
"is_positive_test": false,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "DDL_COMMAND_END",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER",
"providers": []
},
"parameters_to_compare": [
"name",
"eventname",
"eventowner",
"eventfunname"
],
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(True, True), (False, 'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Error while getting oid of newly created event trigger",
"url": "/browser/event_trigger/obj/",
"error_getting_event_trigger_oid": true,
"is_positive_test": false,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "DDL_COMMAND_END",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER",
"providers": []
},
"parameters_to_compare": [
],
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(True, True), (True, True), (False, 'Mocked Internal Server Error while getting oid of created event trigger')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Create DDL_COMMAND_END Event Trigger with less parameter",
"url": "/browser/event_trigger/obj/",
"is_positive_test": false,
"parameter_missing": true,
"test_data": {
"enabled": "O",
"eventfunname": "PLACE_HOLDER",
"eventname": "ddl_command_start",
"eventowner": "PLACE_HOLDER",
"name": "PLACE_HOLDER"
},
"parameters_to_compare": [
"enabled",
"eventname",
"eventowner",
"name"
],
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 400
}
}
],
"get_event_trigger": [
{
"name": "Get Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while fetching a Event Trigger",
"url": "/browser/event_trigger/obj/",
"error_fetching_event_trigger": true,
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Fetch event trigger using wrong event trigger id",
"url": "/browser/event_trigger/obj/",
"wrong_event_trigger_id": true,
"is_positive_test": false,
"mocking_required": false,
"mock_data": {
},
"expected_data": {
"status_code": 410
}
},
{
"name": "Get Event Trigger list",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"event_trigger_list": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while fetching a Event Trigger list",
"url": "/browser/event_trigger/obj/",
"error_fetching_event_trigger": true,
"is_positive_test": false,
"event_trigger_list": true,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger')"
},
"expected_data": {
"status_code": 500
}
}
],
"delete_event_trigger": [
{
"name": "delete Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while deleting a Event Trigger",
"url": "/browser/event_trigger/obj/",
"error_deleting_event_trigger": true,
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False, 'Mocked Internal Server Error while deleting a event trigger')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "delete event trigger using wrong event trigger id",
"url": "/browser/event_trigger/obj/",
"wrong_event_trigger_id": true,
"is_positive_test": false,
"mocking_required": false,
"mock_data": {
},
"expected_data": {
"status_code": 410
}
},
{
"name": "Error while deleting a created Event Trigger",
"url": "/browser/event_trigger/obj/",
"error_deleting_created_event_trigger": true,
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(True, True),(False, 'Mocked Internal Server Error while deleting a event trigger')"
},
"expected_data": {
"status_code": 500
}
}
],
"update_event_trigger": [
{
"name": "Update Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"mocking_required": false,
"test_data": {
"comment": "This is event trigger update comment",
"id": "PLACE_HOLDER"
},
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while updating a Event Trigger",
"url": "/browser/event_trigger/obj/",
"error_updating_event_trigger": true,
"test_data": {
"comment": "This is event trigger update comment",
"id": "PLACE_HOLDER"
},
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Update event trigger using wrong event trigger id",
"url": "/browser/event_trigger/obj/",
"wrong_event_trigger_id": true,
"is_positive_test": false,
"test_data": {
"comment": "This is event trigger update comment",
"id": "PLACE_HOLDER"
},
"mocking_required": false,
"mock_data": {
},
"expected_data": {
"status_code": 410
}
},
{
"name": "Error while updating a Event Trigger",
"url": "/browser/event_trigger/obj/",
"error_in_db": true,
"is_positive_test": false,
"test_data": {
"comment": "This is event trigger update comment",
"id": "PLACE_HOLDER"
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.browser.server_groups.servers.databases.event_triggers.EventTriggerView.get_sql",
"return_value": "('')"
},
"expected_data": {
"status_code": 200
}
}
],
"dependency_dependent_event_trigger": [
{
"name": "Get Event Trigger dependency",
"url": "/browser/event_trigger/dependency/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Get Event Trigger dependent",
"url": "/browser/event_trigger/dependent/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
}
],
"get_event_trigger_nodes_and_node": [
{
"name": "Get Event Trigger nodes",
"url": "/browser/event_trigger/nodes/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while fetching a Event Trigger nodes",
"url": "/browser/event_trigger/nodes/",
"error_fetching_event_trigger": true,
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger nodes')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Get Event Trigger node",
"url": "/browser/event_trigger/nodes/",
"is_positive_test": true,
"node": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while fetching a Event Trigger node",
"url": "/browser/event_trigger/nodes/",
"error_fetching_event_trigger": true,
"is_positive_test": false,
"node": true,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger nodes')"
},
"expected_data": {
"status_code": 500
}
}
],
"get_event_trigger_sql": [
{
"name": "Get Event Trigger SQL",
"url": "/browser/event_trigger/sql/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while fetching a created Event Trigger sql",
"url": "/browser/event_trigger/sql/",
"error_fetching_event_trigger": true,
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger sql')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Error while fetching a DB created Event Trigger sql",
"url": "/browser/event_trigger/sql/",
"error_fetching_event_trigger_db": true,
"is_positive_test": false,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False, 'Mocked Internal Server Error while fetching a DB created Event Trigger sql')"
},
"expected_data": {
"status_code": 500
}
},
{
"name": "Get modified Event Trigger SQL",
"url": "/browser/event_trigger/msql/",
"is_positive_test": true,
"mocking_required": false,
"msql": true,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Get modified Event Trigger SQL when no trigger function passed",
"url": "/browser/event_trigger/msql/",
"is_positive_test": false,
"error_fetching_event_trigger_msql": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410
}
}
],
"get_event_trigger_functions": [
{
"name": "Get Event Trigger functions",
"url": "/browser/event_trigger/fopts/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
},
{
"name": "Error while fetching a Event Trigger functions",
"url": "/browser/event_trigger/fopts/",
"error_fetching_event_trigger": true,
"is_positive_test": false,
"event_trigger_functions": true,
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "(False, 'Mocked Internal Server Error while fetching a event trigger functions')"
},
"expected_data": {
"status_code": 500
}
}
],
"delete_multiple_event_trigger" :[
{
"name": "Delete multiple Event Trigger",
"url": "/browser/event_trigger/obj/",
"is_positive_test": true,
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200
}
}
]
}

View File

@@ -9,6 +9,7 @@
import json
import uuid
import sys
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@@ -19,15 +20,18 @@ from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class EventTriggerAddTestCase(BaseTestGenerator):
""" This class will add new event trigger under test schema. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
scenarios = utils.generate_scenarios('create_event_trigger',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
@@ -53,6 +57,18 @@ class EventTriggerAddTestCase(BaseTestGenerator):
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
def create_event_trigger(self):
"""
This function create a event trigger and returns the created event
trigger response
:return: created event trigger response
"""
return self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/', data=json.dumps(self.test_data),
content_type='html/json')
def runTest(self):
""" This function will add event trigger under test database. """
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
@@ -71,20 +87,48 @@ class EventTriggerAddTestCase(BaseTestGenerator):
func_name)
if not func_response:
raise Exception("Could not find the trigger function.")
data = {
"enabled": "O",
"eventfunname": "%s.%s" % (self.schema_name, self.func_name),
"eventname": "DDL_COMMAND_END",
"eventowner": self.db_user,
"name": "event_trigger_add_%s" % (str(uuid.uuid4())[1:8]),
"providers": []
}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/', data=json.dumps(data),
content_type='html/json')
self.assertAlmostEquals(response.status_code, 200)
self.test_data['eventfunname'] = "%s.%s" % (
self.schema_name, self.func_name)
self.test_data['eventowner'] = self.db_user
self.test_data['name'] = "event_trigger_add_%s" % (
str(uuid.uuid4())[1:8])
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
self.create_event_trigger()
expected_output = event_trigger_utils.verify_event_trigger_node(
self)
del self.test_data['eventfunname']
self.assertDictEqual(expected_output, self.test_data)
else:
if hasattr(self, "error_creating_event_trigger"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.create_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "error_getting_event_trigger_oid"):
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = self.create_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "internal_server_error"):
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = self.create_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "parameter_missing"):
del self.test_data['eventfunname']
response = self.create_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect database

View File

@@ -8,6 +8,7 @@
##########################################################################
import uuid
import sys
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@@ -20,14 +21,16 @@ from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class EventTriggerDeleteTestCase(BaseTestGenerator):
""" This class will delete added event trigger under test database. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
scenarios = utils.generate_scenarios('delete_event_trigger',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
@@ -58,6 +61,18 @@ class EventTriggerDeleteTestCase(BaseTestGenerator):
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def delete_event_trigger(self):
"""
This function returns the event trigger delete response
:return: event trigger delete response
"""
return self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.event_trigger_id),
follow_redirects=True)
def runTest(self):
""" This function will delete event trigger under test database. """
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
@@ -81,13 +96,32 @@ class EventTriggerDeleteTestCase(BaseTestGenerator):
self.trigger_name)
if not trigger_response:
raise Exception("Could not find event trigger.")
del_response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.event_trigger_id),
follow_redirects=True)
self.assertEquals(del_response.status_code, 200)
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
response = self.delete_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
else:
if hasattr(self, "error_deleting_event_trigger"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.delete_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "error_deleting_created_event_trigger"):
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = self.delete_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "wrong_event_trigger_id"):
self.event_trigger_id = 99999
response = self.delete_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database

View File

@@ -24,11 +24,8 @@ from . import utils as event_trigger_utils
class EventTriggerMultipleDeleteTestCase(BaseTestGenerator):
""" This class will delete added event trigger under test database. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
scenarios = utils.generate_scenarios('delete_multiple_event_trigger',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
@@ -64,6 +61,20 @@ class EventTriggerMultipleDeleteTestCase(BaseTestGenerator):
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_names[1]))
def delete_multiple(self, data):
"""
This function returns multiple event trigger delete response
:param data: event trigger ids to delete
:return: event trigger delete response
"""
return self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/',
follow_redirects=True,
data=json.dumps(data),
content_type='html/json')
def runTest(self):
""" This function will delete event trigger under test database. """
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
@@ -93,14 +104,16 @@ class EventTriggerMultipleDeleteTestCase(BaseTestGenerator):
if not trigger_response:
raise Exception("Could not find event trigger.")
data = {'ids': self.event_trigger_ids}
del_response = self.tester.delete(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/',
follow_redirects=True,
data=json.dumps(data),
content_type='html/json')
self.assertEquals(del_response.status_code, 200)
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
response = self.delete_multiple(data)
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database

View File

@@ -0,0 +1,105 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
class EventTriggerDependencyDependentTestCase(BaseTestGenerator):
""" This class will fetch added event trigger dependency and dependent
under test database. """
scenarios = utils.generate_scenarios('dependency_dependent_event_trigger',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.schema_id = self.schema_data['schema_id']
self.extension_name = "postgres_fdw"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8]
self.trigger_name = "event_trigger_delete_%s" % (
str(uuid.uuid4())[1:8])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def runTest(self):
""" This function will delete event trigger under test database. """
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
func_name = self.function_info[1]
func_response = trigger_funcs_utils.verify_trigger_function(
self.server,
self.db_name,
func_name)
if not func_response:
raise Exception("Could not find the trigger function.")
trigger_response = event_trigger_utils.verify_event_trigger(
self.server, self.db_name,
self.trigger_name)
if not trigger_response:
raise Exception("Could not find event trigger.")
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
response = self.get_dependency_dependent()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def get_dependency_dependent(self):
"""
This function returns the event trigger dependency and dependent
:return: event trigger dependency and dependent
"""
return self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.event_trigger_id),
follow_redirects=True)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -0,0 +1,116 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import sys
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class EventTriggerFunctionsTestCase(BaseTestGenerator):
""" This class will fetch added event trigger functions
under test database. """
scenarios = utils.generate_scenarios('get_event_trigger_functions',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.schema_id = self.schema_data['schema_id']
self.extension_name = "postgres_fdw"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8]
self.trigger_name = "event_trigger_delete_%s" % (
str(uuid.uuid4())[1:8])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def get_event_trigger_functions(self):
"""
This function returns event trigger functions
:return: event trigger functions
"""
return self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.event_trigger_id),
follow_redirects=True)
def runTest(self):
""" This function will delete event trigger under test database. """
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
func_name = self.function_info[1]
func_response = trigger_funcs_utils.verify_trigger_function(
self.server,
self.db_name,
func_name)
if not func_response:
raise Exception("Could not find the trigger function.")
trigger_response = event_trigger_utils.verify_event_trigger(
self.server, self.db_name,
self.trigger_name)
if not trigger_response:
raise Exception("Could not find event trigger.")
if self.is_positive_test:
response = self.get_event_trigger_functions()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
else:
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.get_event_trigger_functions()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -8,7 +8,7 @@
##########################################################################
import uuid
import sys
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
@@ -20,14 +20,16 @@ from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class EventTriggerGetTestCase(BaseTestGenerator):
""" This class will fetch added event trigger under test database. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
scenarios = utils.generate_scenarios('get_event_trigger',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
@@ -57,6 +59,30 @@ class EventTriggerGetTestCase(BaseTestGenerator):
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def get_event_trigger(self):
"""
This functions returns the event trigger details
:return: event trigger details
"""
return self.tester.get(
self.url +
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
str(self.db_id) + '/' + str(self.event_trigger_id),
content_type='html/json'
)
def get_event_trigger_list(self):
"""
This functions returns the event trigger list
:return: event trigger list
"""
return self.tester.get(
self.url +
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
str(self.db_id) + '/',
content_type='html/json'
)
def runTest(self):
""" This function will fetch added event trigger under test database.
"""
@@ -76,13 +102,138 @@ class EventTriggerGetTestCase(BaseTestGenerator):
func_name)
if not func_response:
raise Exception("Could not find the trigger function.")
response = self.tester.get(
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
if hasattr(self, "event_trigger_list"):
response = self.get_event_trigger_list()
else:
response = self.get_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
else:
if hasattr(self, "error_fetching_event_trigger"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
if hasattr(self, "event_trigger_list"):
response = self.get_event_trigger_list()
else:
response = self.get_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "wrong_event_trigger_id"):
self.event_trigger_id = 99999
response = self.get_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)
class EventTriggerGetNodesAndNodeTestCase(BaseTestGenerator):
""" This class will fetch added event trigger under test database. """
scenarios = utils.generate_scenarios('get_event_trigger_nodes_and_node',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.schema_id = self.schema_data['schema_id']
self.extension_name = "postgres_fdw"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8]
self.trigger_name = "event_trigger_get_%s" % (str(uuid.uuid4())[1:8])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def get_event_trigger_nodes(self):
"""
This function returns the event trigger nodes details
:return: event trigger nodes details
"""
return self.tester.get(
self.url +
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
str(self.db_id) + '/',
content_type='html/json'
)
def get_event_trigger_node(self):
"""
This function returns the event trigger node details
:return: event trigger node details
"""
return self.tester.get(
self.url +
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
str(self.db_id) + '/' + str(self.event_trigger_id),
content_type='html/json'
)
self.assertEquals(response.status_code, 200)
def runTest(self):
""" This function will fetch added event trigger under test database.
"""
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
func_name = self.function_info[1]
func_response = trigger_funcs_utils.verify_trigger_function(
self.server,
self.db_name,
func_name)
if not func_response:
raise Exception("Could not find the trigger function.")
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
if hasattr(self, "node"):
response = self.get_event_trigger_node()
else:
response = self.get_event_trigger_nodes()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
else:
if hasattr(self, "error_fetching_event_trigger"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
if hasattr(self, "node"):
response = self.get_event_trigger_node()
else:
response = self.get_event_trigger_nodes()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database

View File

@@ -9,6 +9,7 @@
import json
import uuid
import sys
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
@@ -21,14 +22,16 @@ from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class EventTriggerPutTestCase(BaseTestGenerator):
""" This class will fetch added event trigger under test database. """
scenarios = [
# Fetching default URL for event trigger node.
('Fetch Event Trigger Node URL',
dict(url='/browser/event_trigger/obj/'))
]
scenarios = utils.generate_scenarios('update_event_trigger',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
@@ -58,6 +61,18 @@ class EventTriggerPutTestCase(BaseTestGenerator):
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def update_event_trigger(self):
"""
This functions update event trigger details
:return: Event trigger update request details
"""
return self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.event_trigger_id),
data=json.dumps(self.test_data),
follow_redirects=True)
def runTest(self):
""" This function will update event trigger under test database. """
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
@@ -80,17 +95,33 @@ class EventTriggerPutTestCase(BaseTestGenerator):
self.server, self.db_name, self.trigger_name)
if not trigger_response:
raise Exception("Could not find event trigger.")
data = {
"comment": "This is event trigger update comment",
"id": self.event_trigger_id
}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.event_trigger_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(put_response.status_code, 200)
self.test_data['id'] = self.event_trigger_id
actual_response_code = True
expected_response_code = False
if self.is_positive_test:
response = self.update_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
else:
if hasattr(self, "error_updating_event_trigger"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.update_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "wrong_event_trigger_id"):
self.event_trigger_id = 99999
response = self.update_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "error_in_db"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.update_event_trigger()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database

View File

@@ -0,0 +1,134 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
import sys
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils import server_utils as server_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression import trigger_funcs_utils
from regression.python_test_utils import test_utils as utils
from . import utils as event_trigger_utils
if sys.version_info < (3, 3):
from mock import patch
else:
from unittest.mock import patch
class EventTriggerGetSqlTestCase(BaseTestGenerator):
""" This class will fetch added event trigger sql under test database. """
scenarios = utils.generate_scenarios('get_event_trigger_sql',
event_trigger_utils.test_cases)
def setUp(self):
self.schema_data = parent_node_dict['schema'][-1]
self.server_id = self.schema_data['server_id']
self.db_id = self.schema_data['db_id']
self.schema_name = self.schema_data['schema_name']
self.schema_id = self.schema_data['schema_id']
self.extension_name = "postgres_fdw"
self.db_name = parent_node_dict["database"][-1]["db_name"]
self.db_user = self.server["username"]
self.func_name = "trigger_func_%s" % str(uuid.uuid4())[1:8]
self.trigger_name = "event_trigger_get_%s" % (str(uuid.uuid4())[1:8])
server_con = server_utils.connect_server(self, self.server_id)
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
server_version = 0
if "type" in server_con["data"]:
if server_con["data"]["version"] < 90300:
message = "Event triggers are not supported by PG9.2 " \
"and PPAS9.2 and below."
self.skipTest(message)
self.function_info = trigger_funcs_utils.create_trigger_function(
self.server, self.db_name, self.schema_name, self.func_name,
server_version)
self.event_trigger_id = event_trigger_utils.create_event_trigger(
self.server, self.db_name, self.schema_name, self.func_name,
self.trigger_name)
def get_event_trigger_sql(self):
"""
this function fetch event trigger sql
:return: event trigger sql details
"""
return self.tester.get(
self.url +
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
str(self.db_id) + '/' + str(self.event_trigger_id),
content_type='html/json'
)
def get_event_trigger_msql(self):
"""
This function returns the modified sql
:return: event trigger msql details
"""
return self.tester.get(
self.url +
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
str(self.db_id) + '/',
content_type='html/json'
)
def runTest(self):
""" This function will fetch added event trigger under test database.
"""
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database.")
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema.")
func_name = self.function_info[1]
func_response = trigger_funcs_utils.verify_trigger_function(
self.server,
self.db_name,
func_name)
if not func_response:
raise Exception("Could not find the trigger function.")
actual_response_code = True,
expected_response_code = False
if self.is_positive_test or hasattr(self, "msql"):
response = self.get_event_trigger_sql()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
else:
if hasattr(self, "error_fetching_event_trigger"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.get_event_trigger_sql()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "error_fetching_event_trigger_db"):
with patch(self.mock_data["function_name"],
return_value=eval(self.mock_data["return_value"])):
response = self.get_event_trigger_sql()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
if hasattr(self, "error_fetching_event_trigger_msql"):
response = self.get_event_trigger_msql()
actual_response_code = response.status_code
expected_response_code = self.expected_data['status_code']
self.assertEquals(actual_response_code, expected_response_code)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -11,8 +11,15 @@ from __future__ import print_function
import sys
import traceback
import os
import json
from regression.python_test_utils.test_utils import get_db_connection
from regression.python_test_utils import test_utils as utils
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + "/event_triggers_test_data.json") as data_file:
test_cases = json.load(data_file)
def create_event_trigger(server, db_name, schema_name, func_name,
@@ -89,3 +96,34 @@ def verify_event_trigger(server, db_name, trigger_name):
return event_trigger
except Exception:
traceback.print_exc(file=sys.stderr)
def verify_event_trigger_node(self):
"""
This function verifies the event trigger is present in the database
:param self: server details
:return event_trigger: event trigger's expected details
:rtype event_trigger: dict
"""
try:
connection = get_db_connection(self.db_name,
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'],
self.server['sslmode'])
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT evtenabled,"
"evtevent, "
"(select rolname from pg_authid where oid "
"= pl.evtowner) as evtowner,"
" evtname from pg_event_trigger pl "
"WHERE evtname = '%s'" % self.test_data['name'])
event_trigger = pg_cursor.fetchone()
expected_output = utils.create_expected_output(
self.parameters_to_compare, list(event_trigger))
connection.close()
return expected_output
except Exception:
traceback.print_exc(file=sys.stderr)

View File

@@ -14,6 +14,7 @@ import traceback
import os
import json
from regression.python_test_utils.test_utils import get_db_connection
from regression.python_test_utils import test_utils as utils
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + "/language_test_data.json") as data_file:
@@ -83,8 +84,9 @@ def verify_language(self):
"from pg_language pl where lanname='%s'" %
self.data["name"])
language = pg_cursor.fetchall()
expected_output = make_dict(self.parameters_to_compare,
list(language[0]))
expected_output = utils.create_expected_output(
self.parameters_to_compare,
list(language[0]))
connection.close()