mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Improve code coverage and API test cases for Types. Fixes #5336
This commit is contained in:
@@ -1108,6 +1108,7 @@ class TypeView(PGChildNodeView, DataTypeReader, SchemaDiffObjectCompare):
|
||||
|
||||
if not res['rows']:
|
||||
return make_json_response(
|
||||
status=410,
|
||||
success=0,
|
||||
errormsg=gettext(
|
||||
'Error: Object not found.'
|
||||
|
||||
@@ -17,13 +17,14 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesAddTestCase(BaseTestGenerator):
|
||||
""" This class will add type under schema node. """
|
||||
scenarios = [
|
||||
('Add type under schema node', dict(url='/browser/type/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('types_create',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
@@ -42,27 +43,45 @@ class TypesAddTestCase(BaseTestGenerator):
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to add a type.")
|
||||
|
||||
def create_types(self):
|
||||
"""
|
||||
This function create a type and returns the created type response
|
||||
:return: created types response
|
||||
"""
|
||||
return self.tester.post(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(self.data),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
""" This function will add type under schema node. """
|
||||
db_user = self.server["username"]
|
||||
self.type_name = "test_type_add_%s" % (str(uuid.uuid4())[1:8])
|
||||
data = {"name": self.type_name,
|
||||
"is_sys_type": False,
|
||||
"typtype": "c",
|
||||
"typeowner": db_user,
|
||||
"schema": self.schema_name,
|
||||
"composite": [{"member_name": "one", "type": "bigint",
|
||||
"is_tlength": False, "is_precision": False},
|
||||
{"member_name": "two", "type": "\"char\"[]",
|
||||
"is_tlength": False, "is_precision": False}],
|
||||
"enum": [], "typacl": [], "seclabels": []}
|
||||
response = self.tester.post(
|
||||
self.url + str(utils.SERVER_GROUP) + '/' +
|
||||
str(self.server_id) + '/' + str(self.db_id) +
|
||||
'/' + str(self.schema_id) + '/',
|
||||
data=json.dumps(data),
|
||||
content_type='html/json')
|
||||
self.assertEquals(response.status_code, 200)
|
||||
self.data = types_utils.get_types_data(self.type_name,
|
||||
self.schema_name, db_user)
|
||||
if self.is_positive_test:
|
||||
response = self.create_types()
|
||||
else:
|
||||
if hasattr(self, "missing_parameter"):
|
||||
del self.data['name']
|
||||
response = self.create_types()
|
||||
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.create_types()
|
||||
|
||||
if hasattr(self, "error_in_db"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.create_types()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
|
||||
@@ -17,13 +17,13 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesDeleteTestCase(BaseTestGenerator):
|
||||
""" This class will delete type under schema node. """
|
||||
scenarios = [
|
||||
('Delete type under schema node', dict(url='/browser/type/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('types_delete',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
@@ -46,20 +46,48 @@ class TypesDeleteTestCase(BaseTestGenerator):
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will delete type under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type to delete.")
|
||||
response = self.tester.delete(
|
||||
def delete_type(self):
|
||||
"""
|
||||
This function deletes type
|
||||
:return: type delete
|
||||
"""
|
||||
return self.tester.delete(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will delete type under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type to delete.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.delete_type()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.delete_type()
|
||||
|
||||
if hasattr(self, "error_in_db"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.delete_type()
|
||||
|
||||
if hasattr(self, "wrong_type_id"):
|
||||
self.type_id = 99999
|
||||
response = self.delete_type()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
|
||||
@@ -0,0 +1,89 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
import json
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesDeleteMultipleTestCase(BaseTestGenerator):
|
||||
""" This class will delete type under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_delete_multiple',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to delete a type.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to delete a type.")
|
||||
self.type_names = ["test_type_{0}".format(str(uuid.uuid4())[1:8]),
|
||||
"test_type_{0}".format(str(uuid.uuid4())[1:8])]
|
||||
self.type_ids = [types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name,
|
||||
self.type_names[0]),
|
||||
types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name,
|
||||
self.type_names[1])]
|
||||
|
||||
def delete_multiple(self, data):
|
||||
"""
|
||||
This function returns multiple type delete response
|
||||
:param data: type ids to delete
|
||||
:return: type delete response
|
||||
"""
|
||||
return self.tester.delete(self.url + str(utils.SERVER_GROUP) +
|
||||
'/' + str(self.server_id) +
|
||||
'/' + str(self.db_id) + '/' +
|
||||
str(self.schema_id) + '/',
|
||||
follow_redirects=True,
|
||||
data=json.dumps(data),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
""" This function will delete type under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_names[0])
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type to delete.")
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_names[1])
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type to delete.")
|
||||
|
||||
data = {'ids': self.type_ids}
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.delete_multiple(data)
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,68 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
|
||||
|
||||
class TypesDependenciesTestCase(BaseTestGenerator):
|
||||
""" This class will test type dependency and dependent
|
||||
under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_dependency_dependent',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get a type.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema to get a type.")
|
||||
self.type_name = "test_type_dep_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get type under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
response = self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(response.status_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -17,13 +17,13 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesGetTestCase(BaseTestGenerator):
|
||||
""" This class will get the type under schema node. """
|
||||
scenarios = [
|
||||
('Get type under schema node', dict(url='/browser/type/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('types_get',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
@@ -46,16 +46,60 @@ class TypesGetTestCase(BaseTestGenerator):
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get a type under schema node. """
|
||||
response = self.tester.get(
|
||||
def get_type(self):
|
||||
"""
|
||||
This functions returns the type properties
|
||||
:return: type properties
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
def get_type_list(self):
|
||||
"""
|
||||
This functions returns the list all types
|
||||
:return: list all types
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get a type under schema node. """
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database to get a type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
if hasattr(self, "type_list"):
|
||||
response = self.get_type_list()
|
||||
else:
|
||||
response = self.get_type()
|
||||
|
||||
else:
|
||||
if hasattr(self, "error_fetching_type"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
if hasattr(self, "type_list"):
|
||||
response = self.get_type_list()
|
||||
else:
|
||||
response = self.get_type()
|
||||
|
||||
if hasattr(self, "wrong_type_id"):
|
||||
self.type_id = 99999
|
||||
response = self.get_type()
|
||||
|
||||
actual_response_code = response.status_code
|
||||
expected_response_code = self.expected_data['status_code']
|
||||
self.assertEquals(actual_response_code, expected_response_code)
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
|
||||
@@ -0,0 +1,83 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesGetCollationsTestCase(BaseTestGenerator):
|
||||
""" This class will get type collations under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_collations',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_collations(self):
|
||||
"""
|
||||
This function returns collations
|
||||
:return: collations
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will check type-collation under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_collations()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
response = self.get_collations()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,85 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesExternalFunctionTestCase(BaseTestGenerator):
|
||||
""" This class will get type external function under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_external_functions',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_external_function(self):
|
||||
"""
|
||||
This function returns external function
|
||||
:return: external function
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get type-external function
|
||||
under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_external_function()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.get_external_function()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,84 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesSubTypesTestCase(BaseTestGenerator):
|
||||
""" This class will get type's subtype under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_subtypes',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_subtypes(self):
|
||||
"""
|
||||
This function returns subtypes
|
||||
:return: subtypes
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will check type-subtype under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_subtypes()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.get_subtypes()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,76 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
|
||||
|
||||
class TypesSubTypesOpClassTestCase(BaseTestGenerator):
|
||||
""" This class will get types operator class under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_subtypes_opclass',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_subtypes_opclass(self):
|
||||
"""
|
||||
This function returns subtypes opclass
|
||||
:return: subtypes opclass
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will check type operator class under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_subtypes_opclass()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,76 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
|
||||
|
||||
class TypesSubTypeDiffTestCase(BaseTestGenerator):
|
||||
""" This class will get type subtype difference under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_subtype_diff',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_subtype_diff(self):
|
||||
"""
|
||||
This function fetch type subtype diff
|
||||
:return: type subtype diff
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id,
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get type subtype diff under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type in the schema.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_subtype_diff()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,84 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesGetTypesTestCase(BaseTestGenerator):
|
||||
""" This class will get type's get_type function under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_types',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_types(self):
|
||||
"""
|
||||
This function get type-types
|
||||
:return: type-types
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get type-types under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.get_types()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.get_types()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,76 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
|
||||
|
||||
class TypesMSQLTestCase(BaseTestGenerator):
|
||||
""" This class will get type msql under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_msql',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def msql_type(self):
|
||||
"""
|
||||
This function fetch type msql
|
||||
:return: type msql
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id,
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will get type msql under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type in the schema.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.msql_type()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,103 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesNodesTestCase(BaseTestGenerator):
|
||||
""" This class will get type nodes under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_nodes_and_node',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_node_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def get_type_nodes(self):
|
||||
"""
|
||||
This function returns type nodes
|
||||
:return: type nodes
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id),
|
||||
content_type='html/json')
|
||||
|
||||
def get_type_node(self):
|
||||
"""
|
||||
This functions returns the type node
|
||||
:return: type node
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
content_type='html/json')
|
||||
|
||||
def runTest(self):
|
||||
""" This function will check type nodes under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type.")
|
||||
|
||||
if self.is_positive_test:
|
||||
if hasattr(self, "node"):
|
||||
response = self.get_type_node()
|
||||
else:
|
||||
response = self.get_type_nodes()
|
||||
|
||||
else:
|
||||
if hasattr(self, "error_fetching_type"):
|
||||
with patch(self.mock_data["function_name"],
|
||||
return_value=eval(self.mock_data["return_value"])):
|
||||
if hasattr(self, "node"):
|
||||
response = self.get_type_node()
|
||||
else:
|
||||
response = self.get_type_nodes()
|
||||
|
||||
if hasattr(self, "wrong_id"):
|
||||
self.type_id = 99999
|
||||
response = self.get_type_node()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -18,13 +18,13 @@ from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesUpdateTestCase(BaseTestGenerator):
|
||||
""" This class will update type under schema node. """
|
||||
scenarios = [
|
||||
('Update type under schema node', dict(url='/browser/type/obj/'))
|
||||
]
|
||||
scenarios = utils.generate_scenarios('types_update',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
@@ -47,22 +47,40 @@ class TypesUpdateTestCase(BaseTestGenerator):
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def update_type(self):
|
||||
"""
|
||||
This function returns the type update response
|
||||
:return: type update response
|
||||
"""
|
||||
return self.tester.put(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
data=json.dumps(self.test_data),
|
||||
follow_redirects=True)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will update type under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type to update.")
|
||||
data = {"id": self.type_id,
|
||||
"description": "this is test comment."}
|
||||
response = self.tester.put(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id
|
||||
),
|
||||
data=json.dumps(data),
|
||||
follow_redirects=True)
|
||||
self.assertEquals(response.status_code, 200)
|
||||
|
||||
self.test_data['id'] = self.type_id
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.update_type()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.update_type()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
##########################################################################
|
||||
#
|
||||
# pgAdmin 4 - PostgreSQL Tools
|
||||
#
|
||||
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
|
||||
# This software is released under the PostgreSQL Licence
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
import uuid
|
||||
|
||||
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
|
||||
utils as schema_utils
|
||||
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
|
||||
database_utils
|
||||
from pgadmin.utils.route import BaseTestGenerator
|
||||
from regression import parent_node_dict
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
from . import utils as types_utils
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TypesSQLTestCase(BaseTestGenerator):
|
||||
""" This class will get type sql function under schema node. """
|
||||
scenarios = utils.generate_scenarios('types_get_sql',
|
||||
types_utils.test_cases)
|
||||
|
||||
def setUp(self):
|
||||
self.db_name = parent_node_dict["database"][-1]["db_name"]
|
||||
schema_info = parent_node_dict["schema"][-1]
|
||||
self.server_id = schema_info["server_id"]
|
||||
self.db_id = schema_info["db_id"]
|
||||
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id)
|
||||
if not db_con['data']["connected"]:
|
||||
raise Exception("Could not connect to database.")
|
||||
self.schema_id = schema_info["schema_id"]
|
||||
self.schema_name = schema_info["schema_name"]
|
||||
schema_response = schema_utils.verify_schemas(self.server,
|
||||
self.db_name,
|
||||
self.schema_name)
|
||||
if not schema_response:
|
||||
raise Exception("Could not find the schema.")
|
||||
self.type_name = "test_type_%s" % (str(uuid.uuid4())[1:8])
|
||||
self.type_id = types_utils.create_type(self.server, self.db_name,
|
||||
self.schema_name, self.type_name
|
||||
)
|
||||
|
||||
def sql_type(self):
|
||||
"""
|
||||
This function fetch type sql
|
||||
:return: type sql
|
||||
"""
|
||||
return self.tester.get(
|
||||
"{0}{1}/{2}/{3}/{4}/{5}".format(self.url, utils.SERVER_GROUP,
|
||||
self.server_id, self.db_id,
|
||||
self.schema_id, self.type_id,
|
||||
),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def runTest(self):
|
||||
""" This function will check type sql under schema node. """
|
||||
type_response = types_utils.verify_type(self.server, self.db_name,
|
||||
self.type_name)
|
||||
if not type_response:
|
||||
raise Exception("Could not find the type in the schema.")
|
||||
|
||||
if self.is_positive_test:
|
||||
response = self.sql_type()
|
||||
|
||||
else:
|
||||
if hasattr(self, "internal_server_error"):
|
||||
return_value_object = eval(self.mock_data["return_value"])
|
||||
with patch(self.mock_data["function_name"],
|
||||
side_effect=[return_value_object]):
|
||||
response = self.sql_type()
|
||||
|
||||
if hasattr(self, "wrong_type_id"):
|
||||
self.type_id = 99999
|
||||
response = self.sql_type()
|
||||
|
||||
self.assertEquals(response.status_code,
|
||||
self.expected_data['status_code'])
|
||||
|
||||
def tearDown(self):
|
||||
# Disconnect the database
|
||||
database_utils.disconnect_database(self, self.server_id, self.db_id)
|
||||
@@ -0,0 +1,482 @@
|
||||
{
|
||||
"types_create": [
|
||||
{
|
||||
"name": "Create types: With valid data.",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": true,
|
||||
"test_data": {},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while creating types - missing parameter",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": false,
|
||||
"missing_parameter": true,
|
||||
"test_data": {},
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while creating types - internal server error",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": false,
|
||||
"internal_server_error": true,
|
||||
"test_data": {},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while creating types - Error in db",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": false,
|
||||
"error_in_db": true,
|
||||
"test_data": {},
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_delete": [
|
||||
{
|
||||
"name": "delete type",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while deleting a type - internal server error",
|
||||
"url": "/browser/type/obj/",
|
||||
"internal_server_error": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while deleting a type')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "delete type using wrong type id",
|
||||
"url": "/browser/type/obj/",
|
||||
"wrong_type_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while deleting a type - Error in db",
|
||||
"url": "/browser/type/obj/",
|
||||
"error_in_db": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while deleting a type')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_delete_multiple": [
|
||||
{
|
||||
"name": "Delete multiple type",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_nodes_and_node": [
|
||||
{
|
||||
"name": "Get type nodes",
|
||||
"url": "/browser/type/nodes/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching type nodes",
|
||||
"url": "/browser/type/nodes/",
|
||||
"error_fetching_type": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching type nodes')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get type node",
|
||||
"url": "/browser/type/nodes/",
|
||||
"is_positive_test": true,
|
||||
"node": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching type node",
|
||||
"url": "/browser/type/nodes/",
|
||||
"error_fetching_type": true,
|
||||
"is_positive_test": false,
|
||||
"node": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching type node')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Fetching type node with wrong id",
|
||||
"url": "/browser/type/nodes/",
|
||||
"wrong_id": true,
|
||||
"is_positive_test": false,
|
||||
"node": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_dependency_dependent": [
|
||||
{
|
||||
"name": "Get types dependency",
|
||||
"url": "/browser/type/dependency/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get types dependent",
|
||||
"url": "/browser/type/dependent/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_update": [
|
||||
{
|
||||
"name": "Update type",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"test_data": {
|
||||
"description": "This is type update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while updating a type",
|
||||
"url": "/browser/type/obj/",
|
||||
"internal_server_error": true,
|
||||
"test_data": {
|
||||
"description": "This is type update comment",
|
||||
"id": "PLACE_HOLDER"
|
||||
},
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while updating type')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get": [
|
||||
{
|
||||
"name": "Get type",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a type",
|
||||
"url": "/browser/type/obj/",
|
||||
"error_fetching_type": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a type')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Fetch type using wrong type id",
|
||||
"url": "/browser/type/obj/",
|
||||
"wrong_type_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": false,
|
||||
"mock_data": {
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Get type list",
|
||||
"url": "/browser/type/obj/",
|
||||
"is_positive_test": true,
|
||||
"type_list": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching type list",
|
||||
"url": "/browser/type/obj/",
|
||||
"error_fetching_type": true,
|
||||
"is_positive_test": false,
|
||||
"type_list": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching type')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_types": [
|
||||
{
|
||||
"name": "Get types",
|
||||
"url": "/browser/type/get_types/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while getting types",
|
||||
"url": "/browser/type/get_types/",
|
||||
"internal_server_error": true,
|
||||
"is_positive_test": false,
|
||||
"type_list": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching types')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_subtypes": [
|
||||
{
|
||||
"name": "Get subtypes",
|
||||
"url": "/browser/type/get_stypes/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while getting subtypes",
|
||||
"url": "/browser/type/get_stypes/",
|
||||
"internal_server_error": true,
|
||||
"is_positive_test": false,
|
||||
"type_list": true,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching subtypes')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_subtypes_opclass": [
|
||||
{
|
||||
"name": "Get subtypes opclass",
|
||||
"url": "/browser/type/get_subopclass/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_collations": [
|
||||
{
|
||||
"name": "Get collations",
|
||||
"url": "/browser/type/get_collations/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while getting collations",
|
||||
"url": "/browser/type/get_collations/",
|
||||
"internal_server_error": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching collation')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_external_functions": [
|
||||
{
|
||||
"name": "Get external functions",
|
||||
"url": "/browser/type/get_external_functions/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while getting external functions",
|
||||
"url": "/browser/type/get_external_functions/",
|
||||
"internal_server_error": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching external function')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_sql": [
|
||||
{
|
||||
"name": "Get Types SQL",
|
||||
"url": "/browser/type/sql/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a created type sql",
|
||||
"url": "/browser/type/sql/",
|
||||
"internal_server_error": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {
|
||||
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
|
||||
"return_value": "(False, 'Mocked Internal Server Error while fetching a type sql')"
|
||||
},
|
||||
"expected_data": {
|
||||
"status_code": 500
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "Error while fetching a created type sql - wrong type id",
|
||||
"url": "/browser/type/sql/",
|
||||
"wrong_type_id": true,
|
||||
"is_positive_test": false,
|
||||
"mocking_required": true,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 410
|
||||
}
|
||||
}
|
||||
],
|
||||
"types_get_msql": [
|
||||
{
|
||||
"name": "Get Types MSQL",
|
||||
"url": "/browser/type/msql/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}],
|
||||
"types_get_subtype_diff": [
|
||||
{
|
||||
"name": "Get Types subtype diff",
|
||||
"url": "/browser/type/get_stypediff/",
|
||||
"is_positive_test": true,
|
||||
"mocking_required": false,
|
||||
"mock_data": {},
|
||||
"expected_data": {
|
||||
"status_code": 200
|
||||
}
|
||||
}]
|
||||
}
|
||||
@@ -10,9 +10,29 @@
|
||||
|
||||
import sys
|
||||
import traceback
|
||||
import os
|
||||
import json
|
||||
|
||||
from regression.python_test_utils import test_utils as utils
|
||||
|
||||
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
|
||||
with open(CURRENT_PATH + "/types_test_data.json") as data_file:
|
||||
test_cases = json.load(data_file)
|
||||
|
||||
|
||||
def get_types_data(type_name, schema_name, db_user):
|
||||
data = {"name": type_name,
|
||||
"is_sys_type": False,
|
||||
"typtype": "c",
|
||||
"typeowner": db_user,
|
||||
"schema": schema_name,
|
||||
"composite": [{"member_name": "one", "type": "bigint",
|
||||
"is_tlength": False, "is_precision": False},
|
||||
{"member_name": "two", "type": "\"char\"[]",
|
||||
"is_tlength": False, "is_precision": False}],
|
||||
"enum": [], "typacl": [], "seclabels": []}
|
||||
return data
|
||||
|
||||
|
||||
def create_type(server, db_name, schema_name, type_name):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user