mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Improve code coverage and API test cases for Domain and Domain Constraints. Fixes #5326
This commit is contained in:
@@ -488,6 +488,7 @@ class DomainConstraintView(PGChildNodeView):
|
||||
|
||||
if not res['rows']:
|
||||
return make_json_response(
|
||||
status=410,
|
||||
success=0,
|
||||
errormsg=gettext(
|
||||
'Error: Object not found.'
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
|
||||
|
||||
class DomainConstraintTestGenerator(BaseTestGenerator):
|
||||
def runTest(self):
|
||||
return
|
||||
@@ -0,0 +1,550 @@
|
||||
{
|
||||
"domain_constraint_create": [
|
||||
{
|
||||
"name": "Create domain: With valid data",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {
|
||||
"name": "PLACE HOLDER",
|
||||
"consrc": "VALUE > 0",
|
||||
"convalidated": true
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Create domain: With valid data - bad icon",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {
|
||||
"name": "PLACE HOLDER",
|
||||
"consrc": "VALUE > 0",
|
||||
"convalidated": false
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error Create domain - Internal Server error 1",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": false,
|
||||
"internal_server_error": true,
|
||||
"test_data": {
|
||||
"name": "PLACE HOLDER",
|
||||
"consrc": "VALUE > 0",
|
||||
"convalidated": true
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error Create domain - Internal Server error 2",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": false,
|
||||
"error_in_db": true,
|
||||
"test_data": {
|
||||
"name": "PLACE HOLDER",
|
||||
"consrc": "VALUE > 0",
|
||||
"convalidated": true
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
||||
"return_value": "(False, 'Mocked Internal Server Error','')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error Create domain - Internal Server error- coid",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": false,
|
||||
"error_getting_coid": true,
|
||||
"test_data": {
|
||||
"name": "PLACE HOLDER",
|
||||
"consrc": "VALUE > 0",
|
||||
"convalidated": true
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(True, 'Mocking the scalar output'), (False, 'Mocked Internal Server Error while creating domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error Create domain - wrong domain_id",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": false,
|
||||
"error_domain_id": true,
|
||||
"test_data": {
|
||||
"name": "PLACE HOLDER",
|
||||
"consrc": "VALUE > 0",
|
||||
"convalidated": true
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraint_delete": [
|
||||
{
|
||||
"name": "delete domain constraint",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while deleting a domain - Internal server error - 1",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_deleting_domain_constraints": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while deleting a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while deleting a domain - Internal server error - 2",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_deleting_domain_constraints": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while deleting a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "delete domain constraint using wrong domain id",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"wrong_domain_constraint_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraint_multiple_delete" :[
|
||||
{
|
||||
"name": "Delete multiple domain constraint",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraint_dependent_dependency": [
|
||||
{
|
||||
"name": "Get domain constraints dependents",
|
||||
"url": "/browser/domain_constraints/dependent/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraints dependencies",
|
||||
"url": "/browser/domain_constraints/dependency/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraints_get_nodes": [
|
||||
{
|
||||
"name": "Get domain constraints nodes",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"is_positive_test": true,
|
||||
"invalid": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraints - not valid nodes",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"is_positive_test": true,
|
||||
"invalid": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching domain constraints nodes",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"error_fetching_domain_constraint": true,
|
||||
"is_positive_test": false,
|
||||
"invalid": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain constraints nodes')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraint node",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"is_positive_test": true,
|
||||
"invalid": false,
|
||||
"node": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraint - not valid node",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"is_positive_test": true,
|
||||
"invalid": true,
|
||||
"node": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a domain constraint node",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"error_fetching_domain_constraint": true,
|
||||
"is_positive_test": false,
|
||||
"invalid": false,
|
||||
"node": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain nodes')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error - Fetching domain constraint using wrong domain id",
|
||||
"url": "/browser/domain_constraints/nodes/",
|
||||
"wrong_domain_constraint_id": true,
|
||||
"is_positive_test": false,
|
||||
"invalid": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraints_update": [
|
||||
{
|
||||
"name": "Update domain",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"test_data": {
|
||||
"description": "This is domain constraint update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Update domain with convalidate - true",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"test_data": {
|
||||
"description": "This is domain constraint update comment",
|
||||
"id": "PLACE_HOLDER",
|
||||
"convalidated": true
|
||||
},
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Update domain with convalidate - false",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"test_data": {
|
||||
"description": "This is domain constraint update comment",
|
||||
"id": "PLACE_HOLDER",
|
||||
"convalidated": false
|
||||
},
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while updating a domain constraint - Internal server error",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_in_db": true,
|
||||
"test_data": {
|
||||
"description": "This is domain update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "[(False, 'Mocked Internal Server Error while fetching a domain')]"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while updating a domain constraint - Internal server error -SQL",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_in_db": true,
|
||||
"test_data": {
|
||||
"description": "This is domain update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
||||
"return_value": "[(False, 'Mocked SQL statement', '')]"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while updating a domain constraint - Update not required",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_in_db": true,
|
||||
"test_data": {
|
||||
"description": "This is domain update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
||||
"return_value": "[(True, '', '')]"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraints_get_sql": [
|
||||
{
|
||||
"name": "Get Domain Constraint SQL",
|
||||
"url": "/browser/domain_constraints/sql/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error - Get Domain Constraint SQL - Internal server error",
|
||||
"url": "/browser/domain_constraints/sql/",
|
||||
"is_positive_test": false,
|
||||
"internal_server_error": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error - Get Domain Constraint SQL using wrong domain id",
|
||||
"url": "/browser/domain_constraints/sql/",
|
||||
"wrong_domain_constraint_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraints_get_msql": [
|
||||
{
|
||||
"name": "Get domain constraint msql",
|
||||
"is_positive_test": true,
|
||||
"url": "/browser/domain_constraints/msql/",
|
||||
"test_data": {
|
||||
"name": "modifying name",
|
||||
"comment": "Comments to test update"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
||||
"return_value": "('', 'Mocked get_sql function', '')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraint msql: With existing domain constraint id 1.",
|
||||
"is_positive_test": false,
|
||||
"error_in_db": true,
|
||||
"url": "/browser/domain_constraints/msql/",
|
||||
"test_data": {
|
||||
"name": "modifying name",
|
||||
"comment": "Comments to test update"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
||||
"return_value": "('', 'Mocked get_sql function', '')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraint msql: With existing domain constraint id 2.",
|
||||
"is_positive_test": false,
|
||||
"error_in_db": true,
|
||||
"url": "/browser/domain_constraints/msql/",
|
||||
"test_data": {
|
||||
"name": "modifying name",
|
||||
"comment": "Comments to test update"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
||||
"return_value": "('True', 'Mocked get_sql function', '')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_constraints_get": [
|
||||
{
|
||||
"name": "Get domain constraints",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a domain constraints",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_fetching_domain_constraints": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error - Fetching domain constarint with wrong ID",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"wrong_domain_cons_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain constraints list",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"is_positive_test": true,
|
||||
"domain_constraint_list": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a domain constraint list",
|
||||
"url": "/browser/domain_constraints/obj/",
|
||||
"error_fetching_domain_constraints": true,
|
||||
"is_positive_test": false,
|
||||
"domain_constraint_list": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain constraint')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,104 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintAddTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraint_create',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def create_domain_constraint(self):
|
||||
"""
|
||||
This function create a domain constraint and returns it
|
||||
:return: created domain constraint response
|
||||
"""
|
||||
return self.tester.post(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/',
|
||||
data=json.dumps(self.test_data),
|
||||
content_type='html/json',
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.test_data['name'] =\
|
||||
"test_domain_con_add_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.create_domain_constraint()
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.create_domain_constraint()
|
||||
|
||||
if hasattr(self, "error_in_db"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.create_domain_constraint()
|
||||
|
||||
if hasattr(self, "error_getting_coid"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(self.mock_data["return_value"])):
|
||||
response = self.create_domain_constraint()
|
||||
|
||||
if hasattr(self, "error_domain_id"):
|
||||
self.domain_id = 99999
|
||||
response = self.create_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,113 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraint_delete',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_delete_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
|
||||
def delete_domain_constraint(self):
|
||||
"""
|
||||
This function returns the domain constraint delete response
|
||||
:return: domain constraint delete response
|
||||
"""
|
||||
return self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(self.domain_constraint_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.delete_domain_constraint()
|
||||
else:
|
||||
if hasattr(self, "error_deleting_domain_constraints"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.delete_domain_constraint()
|
||||
if hasattr(self, "wrong_domain_constraint_id"):
|
||||
self.domain_constraint_id = 99999
|
||||
response = self.delete_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,115 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintDeleteMultipleTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraint_multiple_delete',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_names = ["test_domain_con_delete_%s" %
|
||||
(str(uuid.uuid4())[1:8]),
|
||||
"test_domain_con_delete_%s" %
|
||||
(str(uuid.uuid4())[1:8])]
|
||||
|
||||
self.domain_info = domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
self.domain_constraint_ids = []
|
||||
self.domain_constraint_ids.append(
|
||||
domain_cons_utils.create_domain_constraints(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_names[0]))
|
||||
self.domain_constraint_ids.append(
|
||||
domain_cons_utils.create_domain_constraints(
|
||||
self.server, self.db_name, self.schema_name,
|
||||
self.domain_name, self.domain_con_names[1]))
|
||||
|
||||
def delete_multiple(self, data):
|
||||
"""
|
||||
This function returns multiple domain constraint delete response
|
||||
:param data: domain constraint ids to delete
|
||||
:return: domain constraint delete response
|
||||
"""
|
||||
return self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' + str(
|
||||
self.server_id) + '/' +
|
||||
str(self.db_id) + '/' + str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/',
|
||||
content_type='html/json',
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_names[0])
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
data = {'ids': self.domain_constraint_ids}
|
||||
if self.is_positive_test:
|
||||
response = self.delete_multiple(data)
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect database to delete it
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,107 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintDependentAndDependencyTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios(
|
||||
'domain_constraint_dependent_dependency', domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_add_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
|
||||
def dependents_domain_constraint(self):
|
||||
"""
|
||||
This function returns the domain constraint dependents response
|
||||
:return: domain constraint dependents response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(self.domain_constraint_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.dependents_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,135 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintGetTestCase(BaseTestGenerator):
|
||||
""" This class will fetch new domain constraint under schema node. """
|
||||
scenarios = utils.generate_scenarios('domain_constraints_get',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_add_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
|
||||
def get_domain_constraint(self):
|
||||
"""
|
||||
This function returns the domain constraint get response
|
||||
:return: domain constraint get response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(self.domain_constraint_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def get_domain_constraint_list(self):
|
||||
"""
|
||||
This functions returns the domain constraint list
|
||||
:return: domain constraint list
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + "/" +
|
||||
str(self.domain_id) + "/",
|
||||
content_type='html/json'
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test:
|
||||
if hasattr(self, "domain_constraint_list"):
|
||||
response = self.get_domain_constraint_list()
|
||||
else:
|
||||
response = self.get_domain_constraint()
|
||||
else:
|
||||
if hasattr(self, "error_fetching_domain_constraints"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
if hasattr(self, "domain_constraint_list"):
|
||||
response = self.get_domain_constraint_list()
|
||||
else:
|
||||
response = self.get_domain_constraint()
|
||||
|
||||
if hasattr(self, "wrong_domain_cons_id"):
|
||||
self.domain_constraint_id = 99999
|
||||
response = self.get_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,114 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintMsqlTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraints_get_msql',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_msql_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = \
|
||||
domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
|
||||
def msql_domain_constraint(self):
|
||||
"""
|
||||
This function returns the domain constraint msql response
|
||||
:return: domain constraint msql response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(self.domain_constraint_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.msql_domain_constraint()
|
||||
else:
|
||||
if hasattr(self, "error_in_db"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.msql_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,139 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintNodeAndNodesTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraints_get_nodes',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_add_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = \
|
||||
domain_cons_utils.create_domain(self.server, self.db_name,
|
||||
self.schema_name, self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_node_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_constraint_id_invalid = \
|
||||
domain_cons_utils.create_domain_constraints_invalid(
|
||||
self.server, self.db_name, self.schema_name,
|
||||
self.domain_name, self.domain_con_name)
|
||||
|
||||
def get_domain_constraint_node(self, domain_con_id):
|
||||
"""
|
||||
This function returns the domain constraint node response
|
||||
:return: domain constraint node response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(domain_con_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test and self.invalid:
|
||||
if hasattr(self, "node"):
|
||||
|
||||
response = self.get_domain_constraint_node(
|
||||
self.domain_constraint_id_invalid)
|
||||
else:
|
||||
response = self.get_domain_constraint_node("")
|
||||
|
||||
elif self.is_positive_test:
|
||||
if hasattr(self, "node"):
|
||||
response = self.get_domain_constraint_node(
|
||||
self.domain_constraint_id)
|
||||
else:
|
||||
response = self.get_domain_constraint_node("")
|
||||
|
||||
else:
|
||||
if hasattr(self, "error_fetching_domain_constraint"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
if hasattr(self, "node"):
|
||||
response = self.get_domain_constraint_node(
|
||||
self.domain_constraint_id)
|
||||
else:
|
||||
response = self.get_domain_constraint_node("")
|
||||
|
||||
if hasattr(self, "wrong_domain_constraint_id"):
|
||||
self.domain_constraint_id = 99999
|
||||
response = \
|
||||
self.get_domain_constraint_node(self.domain_constraint_id)
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,116 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintPutTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraints_update',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_upd_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = \
|
||||
domain_cons_utils.create_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
|
||||
def update_domain_constraint(self):
|
||||
"""
|
||||
This functions update domain constraint
|
||||
:return: Domain Constraint update request details
|
||||
"""
|
||||
return self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(self.domain_constraint_id),
|
||||
data=json.dumps(self.test_data),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update domain under schema node. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.update_domain_constraint()
|
||||
|
||||
else:
|
||||
if hasattr(self, "error_in_db"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(self.mock_data["return_value"])):
|
||||
response = self.update_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,118 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_cons_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainConstraintGetSqlTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain constraint under schema node. """
|
||||
|
||||
scenarios = utils.generate_scenarios('domain_constraints_get_sql',
|
||||
domain_cons_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.domain_name = "domain_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_con_name = \
|
||||
"test_domain_con_sql_%s" % (str(uuid.uuid4())[1:8])
|
||||
|
||||
self.domain_info = \
|
||||
domain_cons_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
self.domain_constraint_id = \
|
||||
domain_cons_utils.create_domain_constraints(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_con_name)
|
||||
|
||||
def sql_domain_constraint(self):
|
||||
"""
|
||||
This function returns the domain constraint sql response
|
||||
:return: domain constraint sql response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id) + '/' +
|
||||
str(self.domain_constraint_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain constraint under test database. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
|
||||
self.domain_id = self.domain_info[0]
|
||||
domain_name = self.domain_info[1]
|
||||
|
||||
domain_response = domain_cons_utils.verify_domain(
|
||||
self.server,
|
||||
self.db_name,
|
||||
self.schema_id,
|
||||
domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain.")
|
||||
|
||||
domain_cons_response = domain_cons_utils.verify_domain_constraint(
|
||||
self.server, self.db_name,
|
||||
self.domain_con_name)
|
||||
if not domain_cons_response:
|
||||
raise Exception("Could not find domain constraint.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.sql_domain_constraint()
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.sql_domain_constraint()
|
||||
|
||||
if hasattr(self, "wrong_domain_constraint_id"):
|
||||
self.domain_constraint_id = 99999
|
||||
response = self.sql_domain_constraint()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,221 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
import os
|
||||
import json
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
|
||||
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||
with open(CURRENT_PATH + "/domain_constraints_test_data.json") as data_file:
|
||||
test_cases = json.load(data_file)
|
||||
|
||||
|
||||
def create_domain_constraints(server, db_name, schema_name,
|
||||
domain_name, domain_constraint_name,
|
||||
domain_sql=None):
|
||||
"""
|
||||
This function is used to add the domain to existing schema
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param schema_name: schema name
|
||||
:type schema_name: str
|
||||
:param schema_id: schema id
|
||||
:type schema_id: int
|
||||
:param domain_name: domain name
|
||||
:type domain_name: str
|
||||
:return: None
|
||||
"""
|
||||
try:
|
||||
connection = utils.get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'],
|
||||
server['sslmode'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
if domain_sql is None:
|
||||
query = 'ALTER DOMAIN ' + schema_name + '.' + domain_name + \
|
||||
' ADD CONSTRAINT ' + domain_constraint_name + \
|
||||
' CHECK (VALUE > 0)'
|
||||
|
||||
else:
|
||||
query = 'ALTER DOMAIN ' + schema_name + '.' +\
|
||||
domain_name + ' ' + domain_sql
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
# Get 'oid' from newly created domain
|
||||
pg_cursor.execute("SELECT oid FROM pg_constraint WHERE"
|
||||
" conname='%s'" %
|
||||
domain_constraint_name)
|
||||
oid = pg_cursor.fetchone()
|
||||
domain_con_id = ''
|
||||
if oid:
|
||||
domain_con_id = oid[0]
|
||||
connection.close()
|
||||
return domain_con_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def verify_domain_constraint(server, db_name, domain_constraint_name):
|
||||
"""
|
||||
This function verifies the domain constraint is present in the database
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param domain_constraint_name: domain_constraint_name to be verified
|
||||
:type domain_constraint_name: str
|
||||
:return domain_con_id: domain constraint's details
|
||||
:rtype event_trigger: tuple
|
||||
"""
|
||||
try:
|
||||
connection = utils.get_db_connection(db_name, server['username'],
|
||||
server['db_password'],
|
||||
server['host'], server['port'],
|
||||
server['sslmode'])
|
||||
pg_cursor = connection.cursor()
|
||||
pg_cursor.execute("SELECT oid FROM pg_constraint WHERE"
|
||||
" conname='%s'" %
|
||||
domain_constraint_name)
|
||||
domain_con_id = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return domain_con_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def create_domain(server, db_name, schema_name,
|
||||
schema_id, domain_name, domain_sql=None):
|
||||
"""
|
||||
This function is used to add the domain to existing schema
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param schema_name: schema name
|
||||
:type schema_name: str
|
||||
:param schema_id: schema id
|
||||
:type schema_id: int
|
||||
:param domain_name: domain name
|
||||
:type domain_name: str
|
||||
:return: None
|
||||
"""
|
||||
try:
|
||||
connection = utils.get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
if domain_sql is None:
|
||||
query = 'CREATE DOMAIN ' + schema_name + '.' + domain_name + \
|
||||
' AS numeric(500,4) DEFAULT 1000'
|
||||
else:
|
||||
query = 'CREATE DOMAIN ' + schema_name + '.' +\
|
||||
domain_name + ' ' + domain_sql
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
# Get 'oid' from newly created domain
|
||||
pg_cursor.execute("SELECT d.oid, d.typname FROM pg_type d WHERE"
|
||||
" d.typname='%s' AND d.typnamespace='%s'" %
|
||||
(domain_name, schema_id))
|
||||
domains = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return domains
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
|
||||
def verify_domain(server, db_name, schema_id, domain_name):
|
||||
"""
|
||||
This function get the oid & name of the domain
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: db name
|
||||
:type db_name: str
|
||||
:param schema_id: schema id
|
||||
:type schema_id: int
|
||||
:param domain_name: domain name
|
||||
:type domain_name: str
|
||||
:return:
|
||||
"""
|
||||
connection = utils.get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'])
|
||||
pg_cursor = connection.cursor()
|
||||
pg_cursor.execute("SELECT d.oid, d.typname FROM pg_type d WHERE"
|
||||
" d.typname='%s' AND d.typnamespace='%s'" %
|
||||
(domain_name, schema_id))
|
||||
domains = pg_cursor.fetchone()
|
||||
connection.close()
|
||||
return domains
|
||||
|
||||
|
||||
def create_domain_constraints_invalid(server, db_name, schema_name,
|
||||
domain_name, domain_constraint_name,
|
||||
domain_sql=None):
|
||||
"""
|
||||
This function is used to add the domain to existing schema
|
||||
:param server: server details
|
||||
:type server: dict
|
||||
:param db_name: database name
|
||||
:type db_name: str
|
||||
:param schema_name: schema name
|
||||
:type schema_name: str
|
||||
:param schema_id: schema id
|
||||
:type schema_id: int
|
||||
:param domain_name: domain name
|
||||
:type domain_name: str
|
||||
:return: None
|
||||
"""
|
||||
try:
|
||||
connection = utils.get_db_connection(db_name,
|
||||
server['username'],
|
||||
server['db_password'],
|
||||
server['host'],
|
||||
server['port'],
|
||||
server['sslmode'])
|
||||
pg_cursor = connection.cursor()
|
||||
|
||||
if domain_sql is None:
|
||||
query = 'ALTER DOMAIN ' + schema_name + '.' + domain_name + \
|
||||
' ADD CONSTRAINT ' + domain_constraint_name + \
|
||||
' CHECK (VALUE > 0) NOT VALID'
|
||||
|
||||
else:
|
||||
query = 'ALTER DOMAIN ' + schema_name + '.' + \
|
||||
domain_name + ' ' + domain_sql
|
||||
|
||||
pg_cursor.execute(query)
|
||||
connection.commit()
|
||||
# Get 'oid' from newly created domain
|
||||
pg_cursor.execute("SELECT oid FROM pg_constraint WHERE"
|
||||
" conname='%s'" %
|
||||
domain_constraint_name)
|
||||
oid = pg_cursor.fetchone()
|
||||
domain_con_id = ''
|
||||
if oid:
|
||||
domain_con_id = oid[0]
|
||||
connection.close()
|
||||
return domain_con_id
|
||||
except Exception:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
@@ -0,0 +1,567 @@
|
||||
{
|
||||
"domain_create": [
|
||||
{
|
||||
"name": "Create domain: With valid data",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {
|
||||
"basensp": "PLACE HOLDER",
|
||||
"basetype": "character",
|
||||
"constraints": [{
|
||||
"conname": "num",
|
||||
"convalidated": true
|
||||
}],
|
||||
"is_tlength": true,
|
||||
"max_val": 2147483647,
|
||||
"min_val": 1,
|
||||
"name": "domain_add",
|
||||
"owner": "PLACE HOLDER",
|
||||
"seclabels": [],
|
||||
"typdefault": "1",
|
||||
"typlen": "10"
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{"name": "Error while creating a domain - Internal server error",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": false,
|
||||
"internal_server_error": true,
|
||||
"test_data": {
|
||||
"basensp": "PLACE HOLDER",
|
||||
"basetype": "character",
|
||||
"constraints": [
|
||||
{
|
||||
"conname": "num",
|
||||
"convalidated": true
|
||||
}
|
||||
],
|
||||
"is_tlength": true,
|
||||
"max_val": 2147483647,
|
||||
"min_val": 1,
|
||||
"name": "domain_add",
|
||||
"owner": "PLACE HOLDER",
|
||||
"seclabels": [],
|
||||
"typdefault": "1",
|
||||
"typlen": "10"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while creating a domain - Internal server error doid",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": false,
|
||||
"error_getting_doid": true,
|
||||
"test_data": {
|
||||
"basensp": "PLACE HOLDER",
|
||||
"basetype": "character",
|
||||
"constraints": [
|
||||
{
|
||||
"conname": "num",
|
||||
"convalidated": true
|
||||
}
|
||||
],
|
||||
"is_tlength": true,
|
||||
"max_val": 2147483647,
|
||||
"min_val": 1,
|
||||
"name": "domain_add",
|
||||
"owner": "PLACE HOLDER",
|
||||
"seclabels": [],
|
||||
"typdefault": "1",
|
||||
"typlen": "10"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(True, 'Mocking the scalar output'), (False, 'Mocked Internal Server Error while creating domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error getting scid while creating a domain - Internal server error",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": false,
|
||||
"error_getting_scid": true,
|
||||
"test_data": {
|
||||
"basensp": "PLACE HOLDER",
|
||||
"basetype": "character",
|
||||
"constraints": [
|
||||
{
|
||||
"conname": "num",
|
||||
"convalidated": true
|
||||
}
|
||||
],
|
||||
"is_tlength": true,
|
||||
"max_val": 2147483647,
|
||||
"min_val": 1,
|
||||
"name": "domain_add",
|
||||
"owner": "PLACE HOLDER",
|
||||
"seclabels": [],
|
||||
"typdefault": "1",
|
||||
"typlen": "10"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(True, True),(True, True), (False, 'Mocked Internal Server Error while creating domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_delete": [
|
||||
{
|
||||
"name": "delete domain",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while deleting a domain - Internal server error - 1",
|
||||
"url": "/browser/domain/obj/",
|
||||
"error_deleting_domain": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while deleting a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while deleting a domain - Internal server error - 2",
|
||||
"url": "/browser/domain/obj/",
|
||||
"error_deleting_domain": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while deleting a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "delete domain using wrong domain id",
|
||||
"url": "/browser/domain/obj/",
|
||||
"wrong_domain_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_update": [
|
||||
{
|
||||
"name": "Update domain",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"test_data": {
|
||||
"description": "This is domain update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while updating a domain",
|
||||
"url": "/browser/domain/obj/",
|
||||
"error_updating_domain": true,
|
||||
"test_data": {
|
||||
"description": "This is domain update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while updating a domain - Internal server error",
|
||||
"url": "/browser/domain/obj/",
|
||||
"error_in_db": true,
|
||||
"test_data": {
|
||||
"description": "This is domain update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "[(True, True),(False, 'Mocked Internal Server Error while fetching a domain')]"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_get_dependencies_dependants": [
|
||||
{
|
||||
"name": "Get domain dependents with existing domain id",
|
||||
"url": "/browser/domain/dependent/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
},
|
||||
"is_dependant": true
|
||||
},
|
||||
{
|
||||
"name": "Get domain dependency with existing domain id",
|
||||
"url": "/browser/domain/dependency/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
},
|
||||
"is_dependant": true
|
||||
}
|
||||
],
|
||||
"domain_get_collations": [
|
||||
{
|
||||
"name": "Create domain get collations: With valid data.",
|
||||
"url": "/browser/domain/get_collations/",
|
||||
"is_positive_test": true,
|
||||
"inventory_data": {},
|
||||
"test_data": {},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Create domain get domain: With valid data while server down.",
|
||||
"url": "/browser/domain/get_collations/",
|
||||
"is_positive_test": false,
|
||||
"inventory_data": {},
|
||||
"test_data": {
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500,
|
||||
"error_msg": "Mocked Internal Server Error",
|
||||
"test_result_data": {}
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_get_nodes": [
|
||||
{
|
||||
"name": "Get domain nodes",
|
||||
"url": "/browser/domain/nodes/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching domain nodes",
|
||||
"url": "/browser/domain/nodes/",
|
||||
"error_fetching_domain": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain nodes')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain node",
|
||||
"url": "/browser/domain/nodes/",
|
||||
"is_positive_test": true,
|
||||
"node": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a domain node",
|
||||
"url": "/browser/domain/nodes/",
|
||||
"error_fetching_domain": true,
|
||||
"is_positive_test": false,
|
||||
"node": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain nodes')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_multiple_delete" :[
|
||||
{
|
||||
"name": "Delete multiple domain",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_get": [
|
||||
{
|
||||
"name": "Get domain",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a domain",
|
||||
"url": "/browser/domain/obj/",
|
||||
"error_fetching_domain": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain list",
|
||||
"url": "/browser/domain/obj/",
|
||||
"is_positive_test": true,
|
||||
"domain_list": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a domain list",
|
||||
"url": "/browser/domain/obj/",
|
||||
"error_fetching_domain": true,
|
||||
"is_positive_test": false,
|
||||
"domain_list": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a domain')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Fetch domain using wrong domain id",
|
||||
"url": "/browser/domain/obj/",
|
||||
"wrong_domain_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_get_types": [
|
||||
{
|
||||
"name": "Domain get types",
|
||||
"url": "/browser/domain/get_types/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Domain get types",
|
||||
"url": "/browser/domain/get_types/",
|
||||
"is_positive_test": false,
|
||||
"internal_server_error": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.DomainView.get_types",
|
||||
"return_value": "(False, 'Mocked internal server error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_get_sql": [
|
||||
{
|
||||
"name": "Get domain sql - char: With existing domain id.",
|
||||
"url": "/browser/domain/sql/",
|
||||
"is_positive_test": true,
|
||||
"Domain_Reverse_Engineered_SQL_with_char" : true,
|
||||
"test_data": {
|
||||
"domain_name": "PLACE HOLDER",
|
||||
"domain_sql": "PLACE HOLDER"
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain sql - length precision: With existing domain id.",
|
||||
"url": "/browser/domain/sql/",
|
||||
"is_positive_test": true,
|
||||
"Domain_Reverse_Engineered_SQL_with_Length_Precision_and_Default" : true,
|
||||
"test_data": {
|
||||
"domain_name": "PLACE HOLDER",
|
||||
"domain_sql": "PLACE HOLDER"
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get domain sql - length: With existing domain id.",
|
||||
"url": "/browser/domain/sql/",
|
||||
"is_positive_test": true,
|
||||
"Domain_Reverse_Engineered_SQL_with_Length" : true,
|
||||
"test_data": {
|
||||
"domain_name": "PLACE HOLDER",
|
||||
"domain_sql": "PLACE HOLDER"
|
||||
},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching the reverse engineered SQL properties - Internal server error",
|
||||
"url": "/browser/domain/sql/",
|
||||
"is_positive_test": false,
|
||||
"internal_server_error": true,
|
||||
"test_data": {
|
||||
"domain_name": "PLACE HOLDER",
|
||||
"domain_sql": "PLACE HOLDER"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching the reverse engineered SQL - wrong domain id",
|
||||
"url": "/browser/domain/sql/",
|
||||
"is_positive_test": false,
|
||||
"wrong_domain_id": true,
|
||||
"test_data": {
|
||||
"domain_name": "PLACE HOLDER",
|
||||
"domain_sql": "PLACE HOLDER"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"domain_get_msql": [
|
||||
{
|
||||
"name": "Get domain msql: With existing domain id.",
|
||||
"is_positive_test": true,
|
||||
"url": "/browser/domain/msql/",
|
||||
"test_data": {
|
||||
"name": "modifying name",
|
||||
"comment": "Comments to test update"
|
||||
},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.DomainView.get_sql",
|
||||
"return_value": "('', 'Mocked get_sql function')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 200,
|
||||
"error_msg": null,
|
||||
"test_result_data": {}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
@@ -17,61 +17,74 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainAddTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain under schema node. """
|
||||
|
||||
scenarios = [
|
||||
# Fetching default URL for domain node.
|
||||
('Fetch domain Node URL', dict(url='/browser/domain/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('domain_create',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
|
||||
def create_domain(self):
|
||||
"""
|
||||
This function create a domain and returns the created domain
|
||||
response
|
||||
:return: created domain response
|
||||
"""
|
||||
return self.tester.post(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(self.test_data),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain under schema node. """
|
||||
db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to add collation.")
|
||||
schema_id = schema_info["schema_id"]
|
||||
schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
db_name,
|
||||
schema_name)
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add the collation.")
|
||||
|
||||
data = {
|
||||
"basensp": schema_name,
|
||||
"basetype": "character",
|
||||
"constraints": [{
|
||||
"conname": "num",
|
||||
"convalidated": True
|
||||
}],
|
||||
"is_tlength": True,
|
||||
"max_val": 2147483647,
|
||||
"min_val": 1,
|
||||
"name": "domain_add_%s" % (str(uuid.uuid4())[1:8]),
|
||||
"owner": self.server["username"],
|
||||
"seclabels": [],
|
||||
"typdefault": "1",
|
||||
"typlen": "10"
|
||||
}
|
||||
self.test_data['basensp'] = self.schema_name
|
||||
self.test_data['owner'] = self.server["username"]
|
||||
# Call POST API to add domain
|
||||
response = self.tester.post(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) +
|
||||
'/' + str(schema_id) + '/',
|
||||
data=json.dumps(data),
|
||||
content_type='html/json')
|
||||
self.assertEquals(response.status_code, 200)
|
||||
if self.is_positive_test:
|
||||
response = self.create_domain()
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.create_domain()
|
||||
|
||||
if hasattr(self, "error_getting_doid"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(self.mock_data["return_value"])):
|
||||
response = self.create_domain()
|
||||
|
||||
if hasattr(self, "error_getting_scid"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(self.mock_data["return_value"])):
|
||||
response = self.create_domain()
|
||||
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(response.status_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
|
||||
@@ -17,18 +17,19 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will delete new domain under schema node. """
|
||||
scenarios = [
|
||||
# Fetching default URL for domain node.
|
||||
('Fetch domain Node URL', dict(url='/browser/domain/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('domain_delete',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.database_info = parent_node_dict["database"][-1]
|
||||
self.db_name = self.database_info["db_name"]
|
||||
self.db_id = self.database_info["db_id"]
|
||||
self.server_id = self.database_info["server_id"]
|
||||
self.schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_name = self.schema_info["schema_name"]
|
||||
self.schema_id = self.schema_info["schema_id"]
|
||||
@@ -39,36 +40,51 @@ class DomainDeleteTestCase(BaseTestGenerator):
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain under schema node. """
|
||||
db_id = self.database_info["db_id"]
|
||||
server_id = self.database_info["server_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
server_id, db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get the domain.")
|
||||
db_name = self.database_info["db_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to get the domain.")
|
||||
domain_id = self.domain_info[0]
|
||||
# Call GET API to verify the domain
|
||||
get_response = self.tester.delete(
|
||||
def delete_domain(self):
|
||||
"""
|
||||
This function returns the domain delete response
|
||||
:return: domain delete response
|
||||
"""
|
||||
return self.tester.delete(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(server_id) + '/' +
|
||||
str(db_id) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(domain_id),
|
||||
str(self.domain_id),
|
||||
content_type='html/json',
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
self.assertEquals(get_response.status_code, 200)
|
||||
def runTest(self):
|
||||
""" This function will delete domain under schema node. """
|
||||
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, server_id, db_id)
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get the domain.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to get the domain.")
|
||||
self.domain_id = self.domain_info[0]
|
||||
# Call GET API to verify the domain
|
||||
if self.is_positive_test:
|
||||
response = self.delete_domain()
|
||||
else:
|
||||
if hasattr(self, "error_deleting_domain"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.delete_domain()
|
||||
|
||||
if hasattr(self, "wrong_domain_id"):
|
||||
self.domain_id = 99999
|
||||
response = self.delete_domain()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
|
||||
@@ -22,14 +22,14 @@ from . import utils as domain_utils
|
||||
|
||||
class DomainDeleteMultipleTestCase(BaseTestGenerator):
|
||||
""" This class will delete new domains under schema node. """
|
||||
scenarios = [
|
||||
# Fetching default URL for domain node.
|
||||
('Fetch domain Node URL', dict(url='/browser/domain/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('domain_multiple_delete',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DomainDeleteMultipleTestCase, self).setUp()
|
||||
self.database_info = parent_node_dict["database"][-1]
|
||||
self.db_id = self.database_info["db_id"]
|
||||
self.server_id = self.database_info["server_id"]
|
||||
self.db_name = self.database_info["db_name"]
|
||||
self.schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_name = self.schema_info["schema_name"]
|
||||
@@ -47,12 +47,22 @@ class DomainDeleteMultipleTestCase(BaseTestGenerator):
|
||||
self.schema_id,
|
||||
self.domain_names[1])]
|
||||
|
||||
def delete_multiple(self, data):
|
||||
"""
|
||||
This function returns multiple domain delete response
|
||||
:param data: domain ids to delete
|
||||
:return: domain delete response
|
||||
"""
|
||||
return self.tester.delete(
|
||||
self.url,
|
||||
content_type='html/json',
|
||||
follow_redirects=True,
|
||||
data=json.dumps(data))
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain under schema node. """
|
||||
db_id = self.database_info["db_id"]
|
||||
server_id = self.database_info["server_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
server_id, db_id)
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get the domain.")
|
||||
db_name = self.database_info["db_name"]
|
||||
@@ -62,18 +72,16 @@ class DomainDeleteMultipleTestCase(BaseTestGenerator):
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to get the domain.")
|
||||
data = {'ids': [self.domain_infos[0][0], self.domain_infos[1][0]]}
|
||||
url = self.url + str(utils.SERVER_GROUP) + '/' + str(
|
||||
server_id) + '/' + str(db_id) + '/' + str(self.schema_id) + "/"
|
||||
self.url = self.url + str(utils.SERVER_GROUP) + '/' + str(
|
||||
self.server_id) + '/' + str(self.db_id) + '/' +\
|
||||
str(self.schema_id) + "/"
|
||||
# Call GET API to verify the domain
|
||||
get_response = self.tester.delete(
|
||||
url,
|
||||
content_type='html/json',
|
||||
follow_redirects=True,
|
||||
data=json.dumps(data))
|
||||
get_response = self.delete_multiple(data)
|
||||
|
||||
self.assertEquals(get_response.status_code, 200)
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, server_id, db_id)
|
||||
actual_response_code = get_response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
|
||||
|
||||
class DomainGetDependentsAndDependencyTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain under schema node. """
|
||||
skip_on_database = ['gpdb']
|
||||
scenarios = utils.generate_scenarios('domain_get_dependencies_dependants',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DomainGetDependentsAndDependencyTestCase, self).setUp()
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.domain_name = "domain_get_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_info = domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def get_dependency_dependent(self):
|
||||
"""
|
||||
This function returns the domain dependency and dependent
|
||||
:return: domain dependency and dependent
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to add domain.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add the domain.")
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_dependency_dependent()
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -17,18 +17,19 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainGetTestCase(BaseTestGenerator):
|
||||
""" This class will fetch new collation under schema node. """
|
||||
scenarios = [
|
||||
# Fetching default URL for domain node.
|
||||
('Fetch domain Node URL', dict(url='/browser/domain/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('domain_get',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.database_info = parent_node_dict["database"][-1]
|
||||
self.db_name = self.database_info["db_name"]
|
||||
self.db_id = self.database_info["db_id"]
|
||||
self.server_id = self.database_info["server_id"]
|
||||
self.schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_name = self.schema_info["schema_name"]
|
||||
self.schema_id = self.schema_info["schema_id"]
|
||||
@@ -39,12 +40,35 @@ class DomainGetTestCase(BaseTestGenerator):
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def get_domain(self):
|
||||
"""
|
||||
This functions returns the domain details
|
||||
:return: domain details
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id),
|
||||
content_type='html/json')
|
||||
|
||||
def get_domain_list(self):
|
||||
"""
|
||||
This functions returns the domain list
|
||||
:return: domain list
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url +
|
||||
str(utils.SERVER_GROUP) + '/' + str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' + str(self.schema_id) + "/",
|
||||
content_type='html/json'
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain under schema node. """
|
||||
db_id = self.database_info["db_id"]
|
||||
server_id = self.database_info["server_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
server_id, db_id)
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get the domain.")
|
||||
db_name = self.database_info["db_name"]
|
||||
@@ -53,18 +77,29 @@ class DomainGetTestCase(BaseTestGenerator):
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to get the domain.")
|
||||
domain_id = self.domain_info[0]
|
||||
self.domain_id = self.domain_info[0]
|
||||
# Call GET API to verify the domain
|
||||
get_response = self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(server_id) + '/' +
|
||||
str(db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(domain_id),
|
||||
content_type='html/json')
|
||||
self.assertEquals(get_response.status_code, 200)
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, server_id, db_id)
|
||||
if self.is_positive_test:
|
||||
if hasattr(self, "domain_list"):
|
||||
response = self.get_domain_list()
|
||||
else:
|
||||
response = self.get_domain()
|
||||
else:
|
||||
if hasattr(self, "error_fetching_domain"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
if hasattr(self, "domain_list"):
|
||||
response = self.get_domain_list()
|
||||
else:
|
||||
response = self.get_domain()
|
||||
if hasattr(self, "wrong_domain_id"):
|
||||
self.domain_id = 99999
|
||||
response = self.get_domain()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainGetCollationsTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain under schema node. """
|
||||
skip_on_database = ['gpdb']
|
||||
scenarios = utils.generate_scenarios('domain_get_collations',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DomainGetCollationsTestCase, self).setUp()
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.domain_name = "domain_get_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_info = domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def get_collations(self):
|
||||
"""
|
||||
This function returns the domain collation details
|
||||
:return: domain collation details
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/', follow_redirects=True,
|
||||
content_type='html/json'
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to add domain.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add the domain.")
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_collations()
|
||||
else:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.get_collations()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,93 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainGetNodeTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain under schema node. """
|
||||
skip_on_database = ['gpdb']
|
||||
scenarios = utils.generate_scenarios('domain_get_nodes',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DomainGetNodeTestCase, self).setUp()
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.domain_name = "domain_get_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_info = domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def get_domain_node(self, domain_id):
|
||||
"""
|
||||
This function returns the domain node or nodes
|
||||
:return: domain node or nodes
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' + str(domain_id),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
""" This function will fetch added domain."""
|
||||
db_con = database_utils.connect_database(self,
|
||||
utils.SERVER_GROUP,
|
||||
self.server_id,
|
||||
self.db_id)
|
||||
if not db_con["info"] == "Database connected.":
|
||||
raise Exception("Could not connect to database.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add the domain.")
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
if self.is_positive_test:
|
||||
if hasattr(self, "node"):
|
||||
response = self.get_domain_node(self.domain_id)
|
||||
else:
|
||||
response = self.get_domain_node("")
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
else:
|
||||
if hasattr(self, "error_fetching_domain"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
if hasattr(self, "node"):
|
||||
response = self.get_domain_node(self.domain_id)
|
||||
else:
|
||||
response = self.get_domain_node("")
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,89 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainGetTypesTestCase(BaseTestGenerator):
|
||||
""" This class will add new domain under schema node. """
|
||||
|
||||
# scenarios = [
|
||||
# # Fetching default URL for domain node.
|
||||
# ('Fetch domain Node URL', dict(url='/browser/domain/obj/'))
|
||||
# ]
|
||||
scenarios = utils.generate_scenarios('domain_get_types',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
super(DomainGetTypesTestCase, self).setUp()
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
self.domain_name = "domain_get_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_info = domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def get_types(self):
|
||||
"""
|
||||
This function returns domain types
|
||||
:return: created domain response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/',
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain under schema node. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to add domain.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add the domain.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_types()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.get_types()
|
||||
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(response.status_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,81 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
import json
|
||||
import re
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainMsqlTestCase(BaseTestGenerator):
|
||||
""" This class will delete new domain under schema node. """
|
||||
scenarios = utils.generate_scenarios('domain_get_msql',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.database_info = parent_node_dict["database"][-1]
|
||||
self.db_name = self.database_info["db_name"]
|
||||
self.db_id = self.database_info["db_id"]
|
||||
self.server_id = self.database_info["server_id"]
|
||||
self.schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_name = self.schema_info["schema_name"]
|
||||
self.schema_id = self.schema_info["schema_id"]
|
||||
self.domain_name = "domain_delete_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.domain_info = domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def msql_domain(self):
|
||||
"""
|
||||
This function returns the domain msql response
|
||||
:return: domain msql response
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to add domain.")
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add the domain.")
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
if self.is_positive_test:
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.msql_domain()
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -18,17 +18,18 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainPutTestCase(BaseTestGenerator):
|
||||
""" This class will fetch new collation under schema node. """
|
||||
scenarios = [
|
||||
# Fetching default URL for domain node.
|
||||
('Fetch domain Node URL', dict(url='/browser/domain/obj/'))
|
||||
]
|
||||
""" This class will fetch new domain under schema node. """
|
||||
scenarios = utils.generate_scenarios('domain_update',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.database_info = parent_node_dict["database"][-1]
|
||||
self.db_id = self.database_info["db_id"]
|
||||
self.server_id = self.database_info["server_id"]
|
||||
self.db_name = self.database_info["db_name"]
|
||||
self.schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_name = self.schema_info["schema_name"]
|
||||
@@ -40,12 +41,24 @@ class DomainPutTestCase(BaseTestGenerator):
|
||||
self.schema_id,
|
||||
self.domain_name)
|
||||
|
||||
def update_domain(self):
|
||||
"""
|
||||
This functions update domain details
|
||||
:return: Domain update request details
|
||||
"""
|
||||
return self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id),
|
||||
data=json.dumps(self.test_data),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update domain under schema node. """
|
||||
db_id = self.database_info["db_id"]
|
||||
server_id = self.database_info["server_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
server_id, db_id)
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get the domain.")
|
||||
db_name = self.database_info["db_name"]
|
||||
@@ -60,21 +73,27 @@ class DomainPutTestCase(BaseTestGenerator):
|
||||
self.domain_name)
|
||||
if not domain_response:
|
||||
raise Exception("Could not find the domain to update.")
|
||||
domain_id = self.domain_info[0]
|
||||
data = {"description": "This is domain update comment",
|
||||
"id": domain_id,
|
||||
}
|
||||
response = self.tester.put(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(server_id) + '/' +
|
||||
str(db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(domain_id),
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True)
|
||||
self.assertEquals(response.status_code, 200)
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, server_id, db_id)
|
||||
self.domain_id = self.domain_info[0]
|
||||
self.test_data['id'] = self.domain_id
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.update_domain()
|
||||
else:
|
||||
if hasattr(self, "error_updating_domain"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.update_domain()
|
||||
|
||||
if hasattr(self, "error_in_db"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=eval(self.mock_data["return_value"])):
|
||||
response = self.update_domain()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
|
||||
@@ -19,32 +19,14 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as domain_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class DomainReverseEngineeredSQLTestCase(BaseTestGenerator):
|
||||
""" This class will verify reverse engineered sql for domain
|
||||
under schema node. """
|
||||
scenarios = [
|
||||
# Fetching default URL for domain node.
|
||||
('Domain Reverse Engineered SQL with char',
|
||||
dict(url='/browser/domain/sql/',
|
||||
domain_name='domain_get_%s' % (str(uuid.uuid4())[1:8]),
|
||||
domain_sql='AS "char";'
|
||||
)
|
||||
),
|
||||
('Domain Reverse Engineered SQL with Length, Precision and Default',
|
||||
dict(url='/browser/domain/sql/',
|
||||
domain_name='domain_get_%s' % (str(uuid.uuid4())[1:8]),
|
||||
domain_sql='AS numeric(12,2) DEFAULT 12 NOT NULL;'
|
||||
)
|
||||
),
|
||||
('Domain Reverse Engineered SQL with Length',
|
||||
dict(url='/browser/domain/sql/',
|
||||
domain_name='domain_get_%s' % (str(uuid.uuid4())[1:8]),
|
||||
domain_sql='AS interval(6);'
|
||||
)
|
||||
),
|
||||
]
|
||||
scenarios = utils.generate_scenarios('domain_get_sql',
|
||||
domain_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.database_info = parent_node_dict["database"][-1]
|
||||
@@ -52,20 +34,51 @@ class DomainReverseEngineeredSQLTestCase(BaseTestGenerator):
|
||||
self.schema_info = parent_node_dict["schema"][-1]
|
||||
self.schema_name = self.schema_info["schema_name"]
|
||||
self.schema_id = self.schema_info["schema_id"]
|
||||
self.domain_info = domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.domain_name,
|
||||
self.domain_sql)
|
||||
self.test_data['domain_name'] = 'domain_get_%s' % (
|
||||
str(uuid.uuid4())[1:8])
|
||||
if hasattr(self, "Domain_Reverse_Engineered_SQL_with_char"):
|
||||
self.test_data['domain_sql'] = 'AS "char";'
|
||||
|
||||
if hasattr(self,
|
||||
"Domain_Reverse_Engineered_SQL_with_Length_Precision_and_Default"):
|
||||
self.test_data['domain_sql'] =\
|
||||
'AS numeric(12,2) DEFAULT 12 NOT NULL;'
|
||||
|
||||
if hasattr(self, "Domain_Reverse_Engineered_SQL_with_Length"):
|
||||
self.test_data['domain_sql'] = 'AS interval(6);'
|
||||
if hasattr(self, "internal_server_error"):
|
||||
self.test_data['domain_sql'] = 'AS "char";'
|
||||
if hasattr(self, "wrong_domain_id"):
|
||||
self.test_data['domain_sql'] = 'AS "char";'
|
||||
|
||||
self.domain_info =\
|
||||
domain_utils.create_domain(self.server,
|
||||
self.db_name,
|
||||
self.schema_name,
|
||||
self.schema_id,
|
||||
self.test_data['domain_name'],
|
||||
self.test_data['domain_sql'])
|
||||
|
||||
def get_sql(self):
|
||||
"""
|
||||
This function returns the doamin sql
|
||||
:return: domain sql
|
||||
"""
|
||||
return self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' +
|
||||
str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(self.domain_id),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add domain and verify the
|
||||
reverse engineered sql. """
|
||||
db_id = self.database_info["db_id"]
|
||||
server_id = self.database_info["server_id"]
|
||||
self.db_id = self.database_info["db_id"]
|
||||
self.server_id = self.database_info["server_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
server_id, db_id)
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get the domain.")
|
||||
|
||||
@@ -75,44 +88,57 @@ class DomainReverseEngineeredSQLTestCase(BaseTestGenerator):
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to get the domain.")
|
||||
domain_id = self.domain_info[0]
|
||||
self.domain_id = self.domain_info[0]
|
||||
|
||||
# Call GET API to fetch the domain sql
|
||||
get_response = self.tester.get(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(server_id) + '/' +
|
||||
str(db_id) + '/' +
|
||||
str(self.schema_id) + '/' +
|
||||
str(domain_id),
|
||||
content_type='html/json')
|
||||
if self.is_positive_test:
|
||||
get_response = self.get_sql()
|
||||
|
||||
self.assertEquals(get_response.status_code, 200)
|
||||
orig_sql = json.loads(get_response.data.decode('utf-8'))
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(get_response.status_code, expected_response_code)
|
||||
orig_sql = json.loads(get_response.data.decode('utf-8'))
|
||||
|
||||
# Replace multiple spaces with one space and check the expected sql
|
||||
sql = re.sub('\s+', ' ', orig_sql).strip()
|
||||
expected_sql = '-- DOMAIN: {0}.{1} -- DROP DOMAIN {0}.{1}; ' \
|
||||
'CREATE DOMAIN {0}.{1} {2} ' \
|
||||
'ALTER DOMAIN {0}.{1} OWNER' \
|
||||
' TO {3};'.format(self.schema_name,
|
||||
self.domain_name,
|
||||
self.domain_sql,
|
||||
self.server["username"])
|
||||
# Replace multiple spaces with one space and check the expected sql
|
||||
sql = re.sub('\s+', ' ', orig_sql).strip()
|
||||
expected_sql = '-- DOMAIN: {0}.{1} -- DROP DOMAIN {0}.{1}; ' \
|
||||
'CREATE DOMAIN {0}.{1} {2} ' \
|
||||
'ALTER DOMAIN {0}.{1} OWNER' \
|
||||
' TO {3};'.format(self.schema_name,
|
||||
self.test_data['domain_name'],
|
||||
self.test_data['domain_sql'],
|
||||
self.server["username"])
|
||||
|
||||
self.assertEquals(sql, expected_sql)
|
||||
self.assertEquals(sql, expected_sql)
|
||||
|
||||
domain_utils.delete_domain(self.server, db_name,
|
||||
self.schema_name, self.domain_name)
|
||||
domain_utils.delete_domain(self.server,
|
||||
db_name,
|
||||
self.schema_name,
|
||||
self.test_data['domain_name'])
|
||||
|
||||
# Verify the reverse engineered sql with creating domain with
|
||||
# the sql we get from the server
|
||||
domain_utils.create_domain_from_sql(self.server, db_name, orig_sql)
|
||||
# Verify the reverse engineered sql with creating domain with
|
||||
# the sql we get from the server
|
||||
domain_utils.create_domain_from_sql(self.server, db_name, orig_sql)
|
||||
|
||||
domain_utils.delete_domain(self.server, db_name,
|
||||
self.schema_name, self.domain_name)
|
||||
domain_utils.delete_domain(self.server, db_name,
|
||||
self.schema_name,
|
||||
self.test_data['domain_name'])
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
get_response = self.get_sql()
|
||||
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, server_id, db_id)
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(get_response.status_code,
|
||||
expected_response_code)
|
||||
|
||||
if hasattr(self, "wrong_domain_id"):
|
||||
self.domain_id = 99999
|
||||
get_response = self.get_sql()
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(get_response.status_code,
|
||||
expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
|
||||
@@ -11,9 +11,14 @@ from __future__ import print_function
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import os
|
||||
import json
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
|
||||
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||
with open(CURRENT_PATH + "/domain_test_data.json") as data_file:
|
||||
test_cases = json.load(data_file)
|
||||
|
||||
|
||||
def create_domain(server, db_name, schema_name, schema_id, domain_name,
|
||||
domain_sql=None):
|
||||
|
||||
Reference in New Issue
Block a user