mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Improve code coverage and API test cases for Schemas. Fixes #5327
This commit is contained in:
parent
32d904058a
commit
318d712c4f
@ -16,6 +16,7 @@ Housekeeping
|
|||||||
************
|
************
|
||||||
|
|
||||||
| `Issue #5324 <https://redmine.postgresql.org/issues/5324>`_ - Improve code coverage and API test cases for Foreign Servers and User Mappings.
|
| `Issue #5324 <https://redmine.postgresql.org/issues/5324>`_ - Improve code coverage and API test cases for Foreign Servers and User Mappings.
|
||||||
|
| `Issue #5327 <https://redmine.postgresql.org/issues/5327>`_ - Improve code coverage and API test cases for Schemas.
|
||||||
| `Issue #5336 <https://redmine.postgresql.org/issues/5336>`_ - Improve code coverage and API test cases for Types.
|
| `Issue #5336 <https://redmine.postgresql.org/issues/5336>`_ - Improve code coverage and API test cases for Types.
|
||||||
| `Issue #5700 <https://redmine.postgresql.org/issues/5700>`_ - Remove old Python 2 compatibility code.
|
| `Issue #5700 <https://redmine.postgresql.org/issues/5700>`_ - Remove old Python 2 compatibility code.
|
||||||
|
|
||||||
|
@ -439,10 +439,7 @@
|
|||||||
"comment": "Comments to test update"
|
"comment": "Comments to test update"
|
||||||
},
|
},
|
||||||
"mocking_required": true,
|
"mocking_required": true,
|
||||||
"mock_data": {
|
"mock_data": {},
|
||||||
"function_name": "pgadmin.browser.server_groups.servers.databases.schemas.domains.domain_constraints.DomainConstraintView.get_sql",
|
|
||||||
"return_value": "('', 'Mocked get_sql function', '')"
|
|
||||||
},
|
|
||||||
"expected_data": {
|
"expected_data": {
|
||||||
"status_code": 200
|
"status_code": 200
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,296 @@
|
|||||||
|
{
|
||||||
|
"schema_create": [
|
||||||
|
{
|
||||||
|
"name": "Create schema: With valid data",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"test_data": {},
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error Creating schema: missing parameter",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": false,
|
||||||
|
"missing_param": true,
|
||||||
|
"test_data": {},
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 410
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error Creating schema: invalid data",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": false,
|
||||||
|
"error_db_id": true,
|
||||||
|
"test_data": {},
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 410
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_delete": [
|
||||||
|
{
|
||||||
|
"name": "Delete schema",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error while deleting a schema - Internal server error",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"error_deleting_schema": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||||
|
"return_value": "(False, 'Mocked Internal Server Error while deleting a schema')"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Delete schema using wrong schema id",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"wrong_schema_id": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 410
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_multiple_delete" :[
|
||||||
|
{
|
||||||
|
"name": "Delete multiple schema",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_dependent_dependency": [
|
||||||
|
{
|
||||||
|
"name": "Get schema dependents",
|
||||||
|
"url": "/browser/schema/dependent/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Get schema dependencies",
|
||||||
|
"url": "/browser/schema/dependency/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_get_nodes": [
|
||||||
|
{
|
||||||
|
"name": "Get schema nodes",
|
||||||
|
"url": "/browser/schema/nodes/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error while fetching schema nodes",
|
||||||
|
"url": "/browser/schema/nodes/",
|
||||||
|
"error_fetching_schema": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"invalid": false,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||||
|
"return_value": "(False, 'Mocked Internal Server Error while fetching schema nodes')"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error - Fetching schema using wrong schema id",
|
||||||
|
"url": "/browser/schema/nodes/",
|
||||||
|
"wrong_schema_id": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"invalid": false,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 410
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_update": [
|
||||||
|
{
|
||||||
|
"name": "Update schema",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"test_data": {},
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error while updating a schema - Internal server error",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"error_in_db": true,
|
||||||
|
"test_data": {},
|
||||||
|
"is_positive_test": false,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||||
|
"return_value": "[(False, 'Mocked Internal Server Error while updating a schema')]"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_get_sql": [
|
||||||
|
{
|
||||||
|
"name": "Get Schema SQL",
|
||||||
|
"url": "/browser/schema/sql/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error - Get schema SQL - Internal server error",
|
||||||
|
"url": "/browser/schema/sql/",
|
||||||
|
"is_positive_test": false,
|
||||||
|
"internal_server_error": true,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||||
|
"return_value": "(False, 'Mocked Internal Server Error while fetching a schema')"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error - Get schema SQL using wrong schema id",
|
||||||
|
"url": "/browser/schema/sql/",
|
||||||
|
"wrong_schema_id": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 410
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_get": [
|
||||||
|
{
|
||||||
|
"name": "Get Schema Properties",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error while fetching Schema Properties",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"error_fetching_schema": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||||
|
"return_value": "(False, 'Mocked Internal Server Error while fetching schema properties')"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Get schema list",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"schema_list": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error while fetching Schema list",
|
||||||
|
"url": "/browser/schema/obj/",
|
||||||
|
"error_fetching_schema": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"schema_list": true,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||||
|
"return_value": "(False, 'Mocked Internal Server Error while fetching schema list')"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"schema_get_children": [
|
||||||
|
{
|
||||||
|
"name": "Get Schema Children",
|
||||||
|
"url": "/browser/schema/children/",
|
||||||
|
"is_positive_test": true,
|
||||||
|
"mocking_required": false,
|
||||||
|
"mock_data": {},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 200
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Error while fetching Schema children",
|
||||||
|
"url": "/browser/schema/children/",
|
||||||
|
"error_in_db": true,
|
||||||
|
"is_positive_test": false,
|
||||||
|
"schema_list": true,
|
||||||
|
"mocking_required": true,
|
||||||
|
"mock_data": {
|
||||||
|
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||||
|
"return_value": "(False, 'Mocked Internal Server Error while fetching schema children')"
|
||||||
|
},
|
||||||
|
"expected_data": {
|
||||||
|
"status_code": 500
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
@ -15,29 +15,45 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
|||||||
from pgadmin.utils.route import BaseTestGenerator
|
from pgadmin.utils.route import BaseTestGenerator
|
||||||
from regression import parent_node_dict
|
from regression import parent_node_dict
|
||||||
from regression.python_test_utils import test_utils as utils
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
from . import utils as schema_utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
class SchemaAddTestCase(BaseTestGenerator):
|
class SchemaAddTestCase(BaseTestGenerator):
|
||||||
""" This class will add new schema under database node. """
|
""" This class will add new schema under database node. """
|
||||||
scenarios = [
|
scenarios = utils.generate_scenarios('schema_create',
|
||||||
# Fetching default URL for schema node.
|
schema_utils.test_cases)
|
||||||
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
|
|
||||||
]
|
def setUp(self):
|
||||||
|
database_info = parent_node_dict["database"][-1]
|
||||||
|
self.server_id = database_info["server_id"]
|
||||||
|
self.db_id = database_info["db_id"]
|
||||||
|
|
||||||
|
def create_schema(self, db_id):
|
||||||
|
"""
|
||||||
|
This function create a schema and returns it
|
||||||
|
:return: created schema response
|
||||||
|
"""
|
||||||
|
is_nice = True
|
||||||
|
state = "nice" if is_nice else "not nice"
|
||||||
|
|
||||||
|
db_id = db_id or self.db_id
|
||||||
|
return self.tester.post(self.url + str(utils.SERVER_GROUP) + '/' +
|
||||||
|
str(self.server_id) + '/' +
|
||||||
|
str(db_id) + '/',
|
||||||
|
data=json.dumps(self.data),
|
||||||
|
content_type='html/json')
|
||||||
|
|
||||||
def runTest(self):
|
def runTest(self):
|
||||||
""" This function will add schema under database node. """
|
""" This function will add schema under database node. """
|
||||||
database_info = parent_node_dict["database"][-1]
|
|
||||||
server_id = database_info["server_id"]
|
|
||||||
|
|
||||||
db_id = database_info["db_id"]
|
|
||||||
db_con = database_utils.connect_database(self,
|
db_con = database_utils.connect_database(self,
|
||||||
utils.SERVER_GROUP,
|
utils.SERVER_GROUP,
|
||||||
server_id,
|
self.server_id,
|
||||||
db_id)
|
self.db_id)
|
||||||
if not db_con["info"] == "Database connected.":
|
if not db_con["info"] == "Database connected.":
|
||||||
raise Exception("Could not connect to database to add the schema.")
|
raise Exception("Could not connect to database to add the schema.")
|
||||||
db_user = self.server["username"]
|
db_user = self.server["username"]
|
||||||
data = {
|
self.data = {
|
||||||
"deffuncacl": [],
|
"deffuncacl": [],
|
||||||
"defseqacl": [],
|
"defseqacl": [],
|
||||||
"deftblacl": [],
|
"deftblacl": [],
|
||||||
@ -65,8 +81,22 @@ class SchemaAddTestCase(BaseTestGenerator):
|
|||||||
],
|
],
|
||||||
"seclabels": []
|
"seclabels": []
|
||||||
}
|
}
|
||||||
response = self.tester.post(self.url + str(utils.SERVER_GROUP) + '/' +
|
|
||||||
str(server_id) + '/' + str(db_id) +
|
if self.is_positive_test:
|
||||||
'/', data=json.dumps(data),
|
response = self.create_schema("")
|
||||||
content_type='html/json')
|
else:
|
||||||
self.assertEquals(response.status_code, 200)
|
if hasattr(self, "error_db_id"):
|
||||||
|
wrong_db_id = 99999
|
||||||
|
response = self.create_schema(wrong_db_id)
|
||||||
|
|
||||||
|
if hasattr(self, "missing_param"):
|
||||||
|
del self.data['name']
|
||||||
|
response = self.create_schema("")
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||||
|
@ -15,15 +15,14 @@ from pgadmin.utils.route import BaseTestGenerator
|
|||||||
from regression import parent_node_dict
|
from regression import parent_node_dict
|
||||||
from regression.python_test_utils import test_utils as utils
|
from regression.python_test_utils import test_utils as utils
|
||||||
from . import utils as schema_utils
|
from . import utils as schema_utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
class SchemaDeleteTestCase(BaseTestGenerator):
|
class SchemaDeleteTestCase(BaseTestGenerator):
|
||||||
""" This class will add new schema under database node. """
|
""" This class will add new schema under database node. """
|
||||||
|
|
||||||
scenarios = [
|
scenarios = utils.generate_scenarios('schema_delete',
|
||||||
# Fetching default URL for extension node.
|
schema_utils.test_cases)
|
||||||
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
|
|
||||||
]
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.database_info = parent_node_dict["database"][-1]
|
self.database_info = parent_node_dict["database"][-1]
|
||||||
@ -38,17 +37,27 @@ class SchemaDeleteTestCase(BaseTestGenerator):
|
|||||||
self.schema_details = schema_utils.create_schema(connection,
|
self.schema_details = schema_utils.create_schema(connection,
|
||||||
self.schema_name)
|
self.schema_name)
|
||||||
|
|
||||||
|
def delete_schema(self):
|
||||||
|
"""
|
||||||
|
This function returns the schema delete response
|
||||||
|
:return: schema delete response
|
||||||
|
"""
|
||||||
|
return self.tester.delete(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
|
||||||
|
'/' + str(self.db_id) + '/' + str(self.schema_id),
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
def runTest(self):
|
def runTest(self):
|
||||||
""" This function will delete schema under database node. """
|
""" This function will delete schema under database node. """
|
||||||
server_id = self.database_info["server_id"]
|
self.server_id = self.database_info["server_id"]
|
||||||
db_id = self.database_info["db_id"]
|
self.db_id = self.database_info["db_id"]
|
||||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
server_id, db_id)
|
self.server_id, self.db_id)
|
||||||
if not db_con['data']["connected"]:
|
if not db_con['data']["connected"]:
|
||||||
raise Exception("Could not connect to database to delete the"
|
raise Exception("Could not connect to database to delete the"
|
||||||
" schema.")
|
" schema.")
|
||||||
|
|
||||||
schema_id = self.schema_details[0]
|
self.schema_id = self.schema_details[0]
|
||||||
schema_name = self.schema_details[1]
|
schema_name = self.schema_details[1]
|
||||||
schema_response = schema_utils.verify_schemas(self.server,
|
schema_response = schema_utils.verify_schemas(self.server,
|
||||||
self.db_name,
|
self.db_name,
|
||||||
@ -56,11 +65,22 @@ class SchemaDeleteTestCase(BaseTestGenerator):
|
|||||||
if not schema_response:
|
if not schema_response:
|
||||||
raise Exception("Could not find the schema to delete.")
|
raise Exception("Could not find the schema to delete.")
|
||||||
|
|
||||||
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
|
if self.is_positive_test:
|
||||||
'/' + str(server_id) + '/' +
|
response = self.delete_schema()
|
||||||
str(db_id) + '/' + str(schema_id),
|
else:
|
||||||
follow_redirects=True)
|
if hasattr(self, "error_deleting_schema"):
|
||||||
self.assertEquals(response.status_code, 200)
|
with patch(self.mock_data["function_name"],
|
||||||
|
return_value=eval(self.mock_data["return_value"])):
|
||||||
|
response = self.delete_schema()
|
||||||
|
|
||||||
|
if hasattr(self, "wrong_schema_id"):
|
||||||
|
self.schema_id = 99999
|
||||||
|
response = self.delete_schema()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
pass
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||||
|
@ -21,10 +21,8 @@ from . import utils as schema_utils
|
|||||||
class SchemaDeleteMultipleTestCase(BaseTestGenerator):
|
class SchemaDeleteMultipleTestCase(BaseTestGenerator):
|
||||||
""" This class will add new schema under database node. """
|
""" This class will add new schema under database node. """
|
||||||
|
|
||||||
scenarios = [
|
scenarios = utils.generate_scenarios('schema_multiple_delete',
|
||||||
# Fetching default URL for extension node.
|
schema_utils.test_cases)
|
||||||
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
|
|
||||||
]
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.database_info = parent_node_dict["database"][-1]
|
self.database_info = parent_node_dict["database"][-1]
|
||||||
@ -51,10 +49,10 @@ class SchemaDeleteMultipleTestCase(BaseTestGenerator):
|
|||||||
|
|
||||||
def runTest(self):
|
def runTest(self):
|
||||||
""" This function will delete schema under database node. """
|
""" This function will delete schema under database node. """
|
||||||
server_id = self.database_info["server_id"]
|
self.server_id = self.database_info["server_id"]
|
||||||
db_id = self.database_info["db_id"]
|
self.db_id = self.database_info["db_id"]
|
||||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
server_id, db_id)
|
self.server_id, self.db_id)
|
||||||
if not db_con['data']["connected"]:
|
if not db_con['data']["connected"]:
|
||||||
raise Exception("Could not connect to database to delete the"
|
raise Exception("Could not connect to database to delete the"
|
||||||
" schema.")
|
" schema.")
|
||||||
@ -75,12 +73,16 @@ class SchemaDeleteMultipleTestCase(BaseTestGenerator):
|
|||||||
|
|
||||||
data = {'ids': [self.schema_details[0], self.schema_details_1[0]]}
|
data = {'ids': [self.schema_details[0], self.schema_details_1[0]]}
|
||||||
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
|
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
|
||||||
'/' + str(server_id) + '/' +
|
'/' + str(self.server_id) + '/' +
|
||||||
str(db_id) + '/',
|
str(self.db_id) + '/',
|
||||||
follow_redirects=True,
|
follow_redirects=True,
|
||||||
data=json.dumps(data),
|
data=json.dumps(data),
|
||||||
content_type='html/json')
|
content_type='html/json')
|
||||||
self.assertEquals(response.status_code, 200)
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
pass
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||||
|
@ -0,0 +1,77 @@
|
|||||||
|
##########################################################################
|
||||||
|
#
|
||||||
|
# pgAdmin 4 - PostgreSQL Tools
|
||||||
|
#
|
||||||
|
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||||
|
# This software is released under the PostgreSQL Licence
|
||||||
|
#
|
||||||
|
##########################################################################
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||||
|
database_utils
|
||||||
|
from pgadmin.utils.route import BaseTestGenerator
|
||||||
|
from regression import parent_node_dict
|
||||||
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
from . import utils as schema_utils
|
||||||
|
|
||||||
|
|
||||||
|
class SchemaDependentAndDependencyTestCase(BaseTestGenerator):
|
||||||
|
""" This class will check dependents and dependencies of
|
||||||
|
schema under database node. """
|
||||||
|
|
||||||
|
scenarios = utils.generate_scenarios('schema_dependent_dependency',
|
||||||
|
schema_utils.test_cases)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.database_info = parent_node_dict["database"][-1]
|
||||||
|
self.db_name = self.database_info["db_name"]
|
||||||
|
# Change the db name, so that schema will create in newly created db
|
||||||
|
self.schema_name = "schema_get_%s" % str(uuid.uuid4())[1:8]
|
||||||
|
connection = utils.get_db_connection(self.db_name,
|
||||||
|
self.server['username'],
|
||||||
|
self.server['db_password'],
|
||||||
|
self.server['host'],
|
||||||
|
self.server['port'])
|
||||||
|
self.schema_details = schema_utils.create_schema(connection,
|
||||||
|
self.schema_name)
|
||||||
|
|
||||||
|
def dependents_schema(self):
|
||||||
|
"""
|
||||||
|
This function returns the schema dependents response
|
||||||
|
:return: schema delete response
|
||||||
|
"""
|
||||||
|
return self.tester.get(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
|
||||||
|
'/' + str(self.db_id) + '/' + str(self.schema_id),
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
def runTest(self):
|
||||||
|
""" This function will check dependents and dependencies
|
||||||
|
of schema under database node. """
|
||||||
|
self.server_id = self.database_info["server_id"]
|
||||||
|
self.db_id = self.database_info["db_id"]
|
||||||
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
|
self.server_id, self.db_id)
|
||||||
|
if not db_con['data']["connected"]:
|
||||||
|
raise Exception("Could not connect to database.")
|
||||||
|
|
||||||
|
self.schema_id = self.schema_details[0]
|
||||||
|
schema_name = self.schema_details[1]
|
||||||
|
schema_response = schema_utils.verify_schemas(self.server,
|
||||||
|
self.db_name,
|
||||||
|
schema_name)
|
||||||
|
if not schema_response:
|
||||||
|
raise Exception("Could not find the schema.")
|
||||||
|
|
||||||
|
if self.is_positive_test:
|
||||||
|
response = self.dependents_schema()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -13,39 +13,75 @@ from pgadmin.utils import server_utils as server_utils
|
|||||||
from pgadmin.utils.route import BaseTestGenerator
|
from pgadmin.utils.route import BaseTestGenerator
|
||||||
from regression import parent_node_dict
|
from regression import parent_node_dict
|
||||||
from regression.python_test_utils import test_utils as utils
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
from . import utils as schema_utils
|
||||||
|
|
||||||
|
|
||||||
class SchemaGetTestCase(BaseTestGenerator):
|
class SchemaGetTestCase(BaseTestGenerator):
|
||||||
""" This class will add new schema under database node. """
|
""" This class will add new schema under database node. """
|
||||||
scenarios = [
|
scenarios = utils.generate_scenarios('schema_get',
|
||||||
# Fetching default URL for extension node.
|
schema_utils.test_cases)
|
||||||
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
|
|
||||||
]
|
def get_schema(self):
|
||||||
|
"""
|
||||||
|
This function returns the schema get response
|
||||||
|
:return: schema get response
|
||||||
|
"""
|
||||||
|
return self.tester.get(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||||
|
str(self.server_id) + '/' + str(self.db_id) +
|
||||||
|
'/' + str(self.schema_id),
|
||||||
|
content_type='html/json')
|
||||||
|
|
||||||
|
def get_schema_list(self):
|
||||||
|
"""
|
||||||
|
This functions returns the schema list
|
||||||
|
:return: schema list
|
||||||
|
"""
|
||||||
|
return self.tester.get(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||||
|
str(self.server_id) + '/' + str(self.db_id) +
|
||||||
|
'/',
|
||||||
|
content_type='html/json')
|
||||||
|
|
||||||
def runTest(self):
|
def runTest(self):
|
||||||
""" This function will delete schema under database node. """
|
""" This function will delete schema under database node. """
|
||||||
schema = parent_node_dict["schema"][-1]
|
schema = parent_node_dict["schema"][-1]
|
||||||
db_id = schema["db_id"]
|
self.db_id = schema["db_id"]
|
||||||
server_id = schema["server_id"]
|
self.server_id = schema["server_id"]
|
||||||
|
|
||||||
server_response = server_utils.connect_server(self, server_id)
|
server_response = server_utils.connect_server(self, self.server_id)
|
||||||
if not server_response["data"]["connected"]:
|
if not server_response["data"]["connected"]:
|
||||||
raise Exception("Could not connect to server to connect the"
|
raise Exception("Could not connect to server to connect the"
|
||||||
" database.")
|
" database.")
|
||||||
|
|
||||||
db_con = database_utils.connect_database(self,
|
db_con = database_utils.connect_database(self,
|
||||||
utils.SERVER_GROUP,
|
utils.SERVER_GROUP,
|
||||||
server_id,
|
self.server_id,
|
||||||
db_id)
|
self.db_id)
|
||||||
if not db_con["info"] == "Database connected.":
|
if not db_con["info"] == "Database connected.":
|
||||||
raise Exception("Could not connect to database to get the schema.")
|
raise Exception("Could not connect to database to get the schema.")
|
||||||
|
|
||||||
schema_id = schema["schema_id"]
|
self.schema_id = schema["schema_id"]
|
||||||
schema_response = self.tester.get(
|
if self.is_positive_test:
|
||||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
if hasattr(self, "schema_list"):
|
||||||
str(server_id) + '/' + str(db_id) +
|
response = self.get_schema_list()
|
||||||
'/' + str(schema_id),
|
else:
|
||||||
content_type='html/json')
|
response = self.get_schema()
|
||||||
self.assertEquals(schema_response.status_code, 200)
|
|
||||||
|
else:
|
||||||
|
if hasattr(self, "error_fetching_schema"):
|
||||||
|
with patch(self.mock_data["function_name"],
|
||||||
|
return_value=eval(self.mock_data["return_value"])):
|
||||||
|
if hasattr(self, "schema_list"):
|
||||||
|
response = self.get_schema_list()
|
||||||
|
else:
|
||||||
|
response = self.get_schema()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
# Disconnect the database
|
# Disconnect the database
|
||||||
database_utils.disconnect_database(self, server_id, db_id)
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||||
|
@ -0,0 +1,84 @@
|
|||||||
|
##########################################################################
|
||||||
|
#
|
||||||
|
# pgAdmin 4 - PostgreSQL Tools
|
||||||
|
#
|
||||||
|
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||||
|
# This software is released under the PostgreSQL Licence
|
||||||
|
#
|
||||||
|
##########################################################################
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||||
|
database_utils
|
||||||
|
from pgadmin.utils.route import BaseTestGenerator
|
||||||
|
from regression import parent_node_dict
|
||||||
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
from . import utils as schema_utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
|
class SchemaChildrenTestCase(BaseTestGenerator):
|
||||||
|
""" This class will add new schema under database node. """
|
||||||
|
|
||||||
|
scenarios = utils.generate_scenarios('schema_get_children',
|
||||||
|
schema_utils.test_cases)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.database_info = parent_node_dict["database"][-1]
|
||||||
|
self.db_name = self.database_info["db_name"]
|
||||||
|
# Change the db name, so that schema will create in newly created db
|
||||||
|
self.schema_name = "schema_get_%s" % str(uuid.uuid4())[1:8]
|
||||||
|
connection = utils.get_db_connection(self.db_name,
|
||||||
|
self.server['username'],
|
||||||
|
self.server['db_password'],
|
||||||
|
self.server['host'],
|
||||||
|
self.server['port'])
|
||||||
|
self.schema_details = schema_utils.create_schema(connection,
|
||||||
|
self.schema_name)
|
||||||
|
|
||||||
|
def get_children(self):
|
||||||
|
"""
|
||||||
|
This function returns the schema children response
|
||||||
|
:return: schema children response
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.tester.get(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
|
||||||
|
'/' + str(self.db_id) + '/' + str(self.schema_id),
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
def runTest(self):
|
||||||
|
""" This function will check children
|
||||||
|
of schema under database node. """
|
||||||
|
self.server_id = self.database_info["server_id"]
|
||||||
|
self.db_id = self.database_info["db_id"]
|
||||||
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
|
self.server_id, self.db_id)
|
||||||
|
if not db_con['data']["connected"]:
|
||||||
|
raise Exception("Could not connect to database.")
|
||||||
|
|
||||||
|
self.schema_id = self.schema_details[0]
|
||||||
|
schema_name = self.schema_details[1]
|
||||||
|
schema_response = schema_utils.verify_schemas(self.server,
|
||||||
|
self.db_name,
|
||||||
|
schema_name)
|
||||||
|
if not schema_response:
|
||||||
|
raise Exception("Could not find the schema.")
|
||||||
|
|
||||||
|
if self.is_positive_test:
|
||||||
|
response = self.get_children()
|
||||||
|
else:
|
||||||
|
if hasattr(self, "error_in_db"):
|
||||||
|
return_value_object = eval(self.mock_data["return_value"])
|
||||||
|
with patch(self.mock_data["function_name"],
|
||||||
|
side_effect=[return_value_object]):
|
||||||
|
response = self.get_children()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -0,0 +1,86 @@
|
|||||||
|
##########################################################################
|
||||||
|
#
|
||||||
|
# pgAdmin 4 - PostgreSQL Tools
|
||||||
|
#
|
||||||
|
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||||
|
# This software is released under the PostgreSQL Licence
|
||||||
|
#
|
||||||
|
##########################################################################
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||||
|
database_utils
|
||||||
|
from pgadmin.utils.route import BaseTestGenerator
|
||||||
|
from regression import parent_node_dict
|
||||||
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
from . import utils as schema_utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
|
class SchemaNodeAndNodesTestCase(BaseTestGenerator):
|
||||||
|
""" This class will add new schema under database node. """
|
||||||
|
|
||||||
|
scenarios = utils.generate_scenarios('schema_get_nodes',
|
||||||
|
schema_utils.test_cases)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.database_info = parent_node_dict["database"][-1]
|
||||||
|
self.db_name = self.database_info["db_name"]
|
||||||
|
# Change the db name, so that schema will create in newly created db
|
||||||
|
self.schema_name = "schema_get_%s" % str(uuid.uuid4())[1:8]
|
||||||
|
connection = utils.get_db_connection(self.db_name,
|
||||||
|
self.server['username'],
|
||||||
|
self.server['db_password'],
|
||||||
|
self.server['host'],
|
||||||
|
self.server['port'])
|
||||||
|
self.schema_details = schema_utils.create_schema(connection,
|
||||||
|
self.schema_name)
|
||||||
|
|
||||||
|
def get_node_schema(self):
|
||||||
|
"""
|
||||||
|
This function returns the schema node response
|
||||||
|
:return: schema node response
|
||||||
|
"""
|
||||||
|
return self.tester.get(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
|
||||||
|
'/' + str(self.db_id) + '/' + str(self.schema_id),
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
def runTest(self):
|
||||||
|
""" This function will check node and nodes
|
||||||
|
of schema under database node. """
|
||||||
|
self.server_id = self.database_info["server_id"]
|
||||||
|
self.db_id = self.database_info["db_id"]
|
||||||
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
|
self.server_id, self.db_id)
|
||||||
|
if not db_con['data']["connected"]:
|
||||||
|
raise Exception("Could not connect to database.")
|
||||||
|
|
||||||
|
self.schema_id = self.schema_details[0]
|
||||||
|
schema_name = self.schema_details[1]
|
||||||
|
schema_response = schema_utils.verify_schemas(self.server,
|
||||||
|
self.db_name,
|
||||||
|
schema_name)
|
||||||
|
if not schema_response:
|
||||||
|
raise Exception("Could not find the schema.")
|
||||||
|
|
||||||
|
if self.is_positive_test:
|
||||||
|
response = self.get_node_schema()
|
||||||
|
else:
|
||||||
|
if hasattr(self, "error_fetching_schema"):
|
||||||
|
with patch(self.mock_data["function_name"],
|
||||||
|
return_value=eval(self.mock_data["return_value"])):
|
||||||
|
response = self.get_node_schema()
|
||||||
|
|
||||||
|
if hasattr(self, "wrong_schema_id"):
|
||||||
|
self.schema_id = 99999
|
||||||
|
response = self.get_node_schema()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -16,15 +16,15 @@ from pgadmin.utils.route import BaseTestGenerator
|
|||||||
from regression import parent_node_dict
|
from regression import parent_node_dict
|
||||||
from regression.python_test_utils import test_utils as utils
|
from regression.python_test_utils import test_utils as utils
|
||||||
from . import utils as schema_utils
|
from . import utils as schema_utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
class SchemaPutTestCase(BaseTestGenerator):
|
class SchemaPutTestCase(BaseTestGenerator):
|
||||||
""" This class will update the schema under database node. """
|
""" This class will update the schema under database node. """
|
||||||
skip_on_database = ['gpdb']
|
skip_on_database = ['gpdb']
|
||||||
scenarios = [
|
|
||||||
# Fetching default URL for extension node.
|
scenarios = utils.generate_scenarios('schema_update',
|
||||||
('Check Schema Node URL', dict(url='/browser/schema/obj/'))
|
schema_utils.test_cases)
|
||||||
]
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(SchemaPutTestCase, self).setUp()
|
super(SchemaPutTestCase, self).setUp()
|
||||||
@ -41,17 +41,27 @@ class SchemaPutTestCase(BaseTestGenerator):
|
|||||||
self.schema_details = schema_utils.create_schema(connection,
|
self.schema_details = schema_utils.create_schema(connection,
|
||||||
self.schema_name)
|
self.schema_name)
|
||||||
|
|
||||||
def runTest(self):
|
def update_schema(self):
|
||||||
""" This function will delete schema under database node. """
|
"""
|
||||||
|
This functions update schema
|
||||||
|
:return: schema update request details
|
||||||
|
"""
|
||||||
|
return self.tester.put(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
|
||||||
|
'/' + str(self.db_id) + '/' + str(self.schema_id),
|
||||||
|
data=json.dumps(self.data), follow_redirects=True)
|
||||||
|
|
||||||
server_id = self.database_info["server_id"]
|
def runTest(self):
|
||||||
db_id = self.database_info["db_id"]
|
""" This function will check update schema under database node. """
|
||||||
|
|
||||||
|
self.server_id = self.database_info["server_id"]
|
||||||
|
self.db_id = self.database_info["db_id"]
|
||||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
server_id, db_id)
|
self.server_id, self.db_id)
|
||||||
if not db_con['data']["connected"]:
|
if not db_con['data']["connected"]:
|
||||||
raise Exception("Could not connect to database to delete the"
|
raise Exception("Could not connect to database to delete the"
|
||||||
" schema.")
|
" schema.")
|
||||||
schema_id = self.schema_details[0]
|
self.schema_id = self.schema_details[0]
|
||||||
schema_name = self.schema_details[1]
|
schema_name = self.schema_details[1]
|
||||||
schema_response = schema_utils.verify_schemas(self.server,
|
schema_response = schema_utils.verify_schemas(self.server,
|
||||||
self.db_name,
|
self.db_name,
|
||||||
@ -60,7 +70,7 @@ class SchemaPutTestCase(BaseTestGenerator):
|
|||||||
raise Exception("Could not find the schema to update.")
|
raise Exception("Could not find the schema to update.")
|
||||||
|
|
||||||
db_user = self.server["username"]
|
db_user = self.server["username"]
|
||||||
data = {
|
self.data = {
|
||||||
"deffuncacl": {
|
"deffuncacl": {
|
||||||
"added":
|
"added":
|
||||||
[
|
[
|
||||||
@ -127,16 +137,22 @@ class SchemaPutTestCase(BaseTestGenerator):
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
"id": schema_id
|
"id": self.schema_id
|
||||||
}
|
}
|
||||||
put_response = self.tester.put(
|
|
||||||
self.url + str(utils.SERVER_GROUP) + '/' + str(server_id) +
|
|
||||||
'/' + str(db_id) + '/' + str(schema_id),
|
|
||||||
data=json.dumps(data), follow_redirects=True)
|
|
||||||
|
|
||||||
self.assertEquals(put_response.status_code, 200)
|
if self.is_positive_test:
|
||||||
# Disconnect the database
|
response = self.update_schema()
|
||||||
database_utils.disconnect_database(self, server_id, db_id)
|
|
||||||
|
else:
|
||||||
|
if hasattr(self, "error_in_db"):
|
||||||
|
with patch(self.mock_data["function_name"],
|
||||||
|
side_effect=eval(self.mock_data["return_value"])):
|
||||||
|
response = self.update_schema()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
pass
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||||
|
@ -0,0 +1,88 @@
|
|||||||
|
##########################################################################
|
||||||
|
#
|
||||||
|
# pgAdmin 4 - PostgreSQL Tools
|
||||||
|
#
|
||||||
|
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||||
|
# This software is released under the PostgreSQL Licence
|
||||||
|
#
|
||||||
|
##########################################################################
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||||
|
database_utils
|
||||||
|
from pgadmin.utils.route import BaseTestGenerator
|
||||||
|
from regression import parent_node_dict
|
||||||
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
from . import utils as schema_utils
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
|
class SchemaSQLTestCase(BaseTestGenerator):
|
||||||
|
""" This class will add new schema under database node. """
|
||||||
|
|
||||||
|
scenarios = utils.generate_scenarios('schema_get_sql',
|
||||||
|
schema_utils.test_cases)
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.database_info = parent_node_dict["database"][-1]
|
||||||
|
self.db_name = self.database_info["db_name"]
|
||||||
|
# Change the db name, so that schema will create in newly created db
|
||||||
|
self.schema_name = "schema_get_%s" % str(uuid.uuid4())[1:8]
|
||||||
|
connection = utils.get_db_connection(self.db_name,
|
||||||
|
self.server['username'],
|
||||||
|
self.server['db_password'],
|
||||||
|
self.server['host'],
|
||||||
|
self.server['port'])
|
||||||
|
self.schema_details = schema_utils.create_schema(connection,
|
||||||
|
self.schema_name)
|
||||||
|
|
||||||
|
def get_sql_node_schema(self):
|
||||||
|
"""
|
||||||
|
This function returns the schema sql response
|
||||||
|
:return: schema sql response
|
||||||
|
"""
|
||||||
|
return self.tester.get(
|
||||||
|
self.url + str(utils.SERVER_GROUP) + '/' + str(self.server_id) +
|
||||||
|
'/' + str(self.db_id) + '/' + str(self.schema_id),
|
||||||
|
follow_redirects=True)
|
||||||
|
|
||||||
|
def runTest(self):
|
||||||
|
""" This function will check sql
|
||||||
|
of schema under database node. """
|
||||||
|
self.server_id = self.database_info["server_id"]
|
||||||
|
self.db_id = self.database_info["db_id"]
|
||||||
|
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||||
|
self.server_id, self.db_id)
|
||||||
|
if not db_con['data']["connected"]:
|
||||||
|
raise Exception("Could not connect to database.")
|
||||||
|
|
||||||
|
self.schema_id = self.schema_details[0]
|
||||||
|
schema_name = self.schema_details[1]
|
||||||
|
schema_response = schema_utils.verify_schemas(self.server,
|
||||||
|
self.db_name,
|
||||||
|
schema_name)
|
||||||
|
if not schema_response:
|
||||||
|
raise Exception("Could not find the schema.")
|
||||||
|
|
||||||
|
if self.is_positive_test:
|
||||||
|
response = self.get_sql_node_schema()
|
||||||
|
|
||||||
|
else:
|
||||||
|
if hasattr(self, "internal_server_error"):
|
||||||
|
return_value_object = eval(self.mock_data["return_value"])
|
||||||
|
with patch(self.mock_data["function_name"],
|
||||||
|
side_effect=[return_value_object]):
|
||||||
|
response = self.get_sql_node_schema()
|
||||||
|
|
||||||
|
if hasattr(self, "wrong_schema_id"):
|
||||||
|
self.schema_id = 99999
|
||||||
|
response = self.get_sql_node_schema()
|
||||||
|
|
||||||
|
actual_response_code = response.status_code
|
||||||
|
expected_response_code = self.expected_data['status_code']
|
||||||
|
self.assertEquals(actual_response_code, expected_response_code)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
# Disconnect the database
|
||||||
|
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
@ -11,9 +11,15 @@
|
|||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
import uuid
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
from regression.python_test_utils import test_utils as utils
|
from regression.python_test_utils import test_utils as utils
|
||||||
|
|
||||||
|
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
with open(CURRENT_PATH + "/schema_test_data.json") as data_file:
|
||||||
|
test_cases = json.load(data_file)
|
||||||
|
|
||||||
|
|
||||||
def get_schema_config_data(db_user):
|
def get_schema_config_data(db_user):
|
||||||
"""This function is used to get advance config test data for schema"""
|
"""This function is used to get advance config test data for schema"""
|
||||||
|
Loading…
Reference in New Issue
Block a user