Improve code coverage and API test cases for Indexes. Fixes #5333

This commit is contained in:
Yogesh Mahajan
2020-05-18 12:29:33 +05:30
committed by Akshay Joshi
parent d58c33dec0
commit 9f445dc052
16 changed files with 1561 additions and 69 deletions

View File

@@ -15,6 +15,7 @@ New features
Housekeeping
************
| `Issue #5255 <https://redmine.postgresql.org/issues/5255>`_ - Implement Selenium Grid to run multiple tests across different browsers, operating systems, and machines in parallel.
| `Issue #5333 <https://redmine.postgresql.org/issues/5333>`_ - Improve code coverage and API test cases for Indexes.
| `Issue #5334 <https://redmine.postgresql.org/issues/5334>`_ - Improve code coverage and API test cases for the Rules module.
| `Issue #5335 <https://redmine.postgresql.org/issues/5335>`_ - Improve code coverage and API test cases for Triggers and Compound Triggers.
| `Issue #5443 <https://redmine.postgresql.org/issues/5443>`_ - Remove support for Python 2.

View File

@@ -0,0 +1,640 @@
{
"index_create": [
{
"name": "Create index: With valid data.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {
"name": "test_index_add",
"spcname": "pg_default",
"amname": "btree",
"columns": [
{
"colname": "id",
"sort_order": false,
"nulls": false
}
],
"include": [
"name"
]
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "Create index: With valid data mumtiple.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {
"name": "test_index_add",
"spcname": "pg_default",
"amname": "btree",
"columns": [
{
"colname": "id",
"sort_order": false,
"nulls": false
},
{
"colname": "id",
"sort_order": true,
"nulls": false
}
],
"include": [
"name"
]
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "Create index: With invalid data - No column name.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"name": "test_index_add",
"spcname": "pg_default",
"amname": "btree",
"columns": [],
"include": [
"name"
]
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": "You must provide one or more column to create index.",
"test_result_data": {}
}
},
{
"name": "Create index: With invalid data - Missing Parameter.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"spcname": "pg_default",
"amname": "btree",
"columns": [
{
"colname": "id",
"sort_order": false,
"nulls": false
}
],
"include": [
"name"
]
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": "Could not find the required parameter (Name).",
"test_result_data": {}
}
},
{
"name": "Create index: With valid data while server is down.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"name": "test_index_add",
"spcname": "pg_default",
"amname": "btree",
"columns": [
{
"colname": "id",
"sort_order": false,
"nulls": false
}
],
"include": [
"name"
]
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "[(True, True),(False, 'Mocked Internal Server Error'),(True,True)]"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
},
{
"name": "Create index: With valid data while server is down-2.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
"name": "test_index_add",
"spcname": "pg_default",
"amname": "btree",
"columns": [
{
"colname": "id",
"sort_order": false,
"nulls": false
}
],
"include": [
"name"
]
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "[(True, True),(True, True),(False, 'Mocked Internal Server Error'),(False, 'Mocked Internal Server Error')]"
},
"expected_data": {
"status_code": 500,
"error_msg": "table_id",
"test_result_data": {}
}
}
],
"index_get": [
{
"name": "Get index details: With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "Get indexes list: With existing indexes.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": true
},
{
"name": "Get index details: With Non-existing index id.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": "Could not find the index in the table.",
"test_result_data": {}
},
"is_list": false
},
{
"name": "Get index details: With existing index id while server is down.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
},
{
"name": "Get index lists : With existing index id while server is down.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": true
}
],
"index_delete": [
{
"name": "Delete index : With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "Delete index : With Non-existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {
"index_id": 90123
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": "Error: Object not found",
"test_result_data": {}
},
"is_list": false
},
{
"name": "Delete index : With existing index id while server down.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
},
{
"name": "Delete index : With existing index id while server down.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
}
],
"index_delete_multiple": [
{
"name": "Delete multiple indexes : With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
}
],
"index_put": [
{
"name": "Put index : With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {
"description": "This is test comment for index",
"name": "updating name for index using api tests"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "Put index : With existing index id while server is down.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_scalar",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
}
],
"index_get_sql": [
{
"name": "Get index sql: With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
}
],
"index_create_get_collations": [
{
"name": "Create index get collations: With valid data.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "Create index get collations: With valid data while server down.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {
},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "[(True,''),(False, 'Mocked Internal Server Error')]"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
],
"index_create_get_op_class": [
{
"name": "Create index get op_class: With valid data.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "Create index get op_class: With valid data while server down.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "[(True,''),(False, 'Mocked Internal Server Error')]"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
],
"index_create_get_access_methods": [
{
"name": "Create index get collations: With valid data.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
}
},
{
"name": "Create index get collations: With valid data while server down.",
"is_positive_test": false,
"inventory_data": {},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_2darray",
"return_value": "[(True,''),(False, 'Mocked Internal Server Error')]"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
}
}
],
"index_get_nodes": [
{
"name": "Get index node: With existing index id.",
"is_positive_test": true,
"inventory_data": {},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "Get index nodes: With existing indexes.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": true
},
{
"name": "Get index node: With Non-existing index id.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 410,
"error_msg": "Could not find the index in the table.",
"test_result_data": {
}
},
"is_list": false
}
],
"index_get_dependencies_dependents": [
{
"name": "Get index dependents: With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_dependent": true
},
{
"name": "Get indexes dependencies: With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_dependent": false
}
],
"index_get_statistics": [
{
"name": "Get index statistics: With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
},
{
"name": "Get index statistics: With existing indexes .",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": true
},
{
"name": "Get index details: With existing index id while server is down.",
"is_positive_test": false,
"inventory_data": {
},
"test_data": {},
"mocking_required": true,
"mock_data": {
"function_name": "pgadmin.utils.driver.psycopg2.connection.Connection.execute_dict",
"return_value": "(False,'Mocked Internal Server Error')"
},
"expected_data": {
"status_code": 500,
"error_msg": "Mocked Internal Server Error",
"test_result_data": {}
},
"is_list": false
}
],
"index_get_index_msql": [
{
"name": "Get index msql: With existing index id.",
"is_positive_test": true,
"inventory_data": {
},
"test_data": {
"name": "modifying name",
"description": "Comments to test update"
},
"mocking_required": false,
"mock_data": {},
"expected_data": {
"status_code": 200,
"error_msg": null,
"test_result_data": {}
},
"is_list": false
}
]
}

View File

@@ -9,6 +9,7 @@
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
@@ -19,13 +20,14 @@ from pgadmin.browser.server_groups.servers.databases.tests import utils as \
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesAddTestCase(BaseTestGenerator):
"""This class will add new index to existing table column"""
scenarios = [
('Add index Node URL', dict(url='/browser/index/obj/'))
]
url = "/browser/index/obj/"
scenarios = utils.generate_scenarios("index_create",
indexes_utils.test_cases)
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
@@ -50,21 +52,34 @@ class IndexesAddTestCase(BaseTestGenerator):
def runTest(self):
"""This function will add index to existing table column."""
self.index_name = "test_index_add_%s" % (str(uuid.uuid4())[1:8])
data = {"name": self.index_name,
"spcname": "pg_default",
"amname": "btree",
"columns": [
{"colname": "id", "sort_order": False, "nulls": False}],
"include": ["name"]
}
response = self.tester.post(
self.url + str(utils.SERVER_GROUP) + '/' +
str(self.server_id) + '/' + str(self.db_id) +
'/' + str(self.schema_id) + '/' + str(self.table_id) + '/',
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
self.data = self.test_data
if "name" in self.data:
self.index_name = self.data["name"] + (str(uuid.uuid4())[1:8])
self.data["name"] = self.index_name
if self.is_positive_test:
response = indexes_utils.api_create_index(self)
indexes_utils.assert_status_code(self, response)
index_response = indexes_utils.verify_index(self.server,
self.db_name,
self.index_name)
self.assertIsNot(index_response, "Could not find the newly "
"created index.")
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = indexes_utils.api_create_index(self)
else:
response = indexes_utils.api_create_index(self)
indexes_utils.assert_status_code(self, response)
if self.expected_data["error_msg"] == "table_id":
indexes_utils.assert_error_message(self, response,
self.table_id)
else:
indexes_utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@@ -0,0 +1,71 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesAddTestCase(BaseTestGenerator):
""" This class will get list access_methods available."""
# Get list of test cases
url = "/browser/index/get_access_methods/"
scenarios = utils.generate_scenarios("index_create_get_access_methods",
indexes_utils.test_cases)
def setUp(self):
""" This function will set up pre-requisite """
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_for_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
""" This function will call api providing list of access methods"""
if self.is_positive_test:
response = indexes_utils.api_create_index_get_access_methods(self)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = indexes_utils.\
api_create_index_get_access_methods(self)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -0,0 +1,69 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesAddTestCase(BaseTestGenerator):
""" This class will get list collations available."""
# Get list of test cases
url = "/browser/index/get_collations/"
scenarios = utils.generate_scenarios("index_create_get_collations",
indexes_utils.test_cases)
def setUp(self):
""" This function will set up pre-requisite """
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_for_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
""" This function will call api providing list of collations"""
if self.is_positive_test:
response = indexes_utils.api_create_index_get_collations(self)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = indexes_utils.api_create_index_get_collations(
self)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -0,0 +1,69 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesAddTestCase(BaseTestGenerator):
""" This class will get list op_class available."""
# Get list of test cases
url = "/browser/index/get_op_class/"
scenarios = utils.generate_scenarios("index_create_get_op_class",
indexes_utils.test_cases)
def setUp(self):
""" This function will set up pre-requisite """
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_for_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
def runTest(self):
""" This function will call api providing list of op_class"""
if self.is_positive_test:
response = indexes_utils.api_create_index_get_op_class(self)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=eval(self.mock_data["return_value"])):
response = indexes_utils.\
api_create_index_get_op_class(self)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -8,6 +8,7 @@
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
@@ -25,11 +26,15 @@ from . import utils as indexes_utils
class IndexesDeleteTestCase(BaseTestGenerator):
"""This class will delete the existing index of column."""
scenarios = [
('Delete index Node URL', dict(url='/browser/index/obj/'))
]
url = "/browser/index/obj/"
# Get test cases
scenarios = utils.generate_scenarios("index_delete",
indexes_utils.test_cases)
def setUp(self):
""" This function will set up pre-requisite
creating index to delete."""
self.data = self.test_data
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@@ -68,14 +73,24 @@ class IndexesDeleteTestCase(BaseTestGenerator):
self.index_name)
if not index_response:
raise Exception("Could not find the index to delete.")
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id) + '/' +
str(self.index_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
if self.is_positive_test:
if 'index_id' in self.data:
self.index_id = self.data["index_id"]
response = indexes_utils.api_delete_index(self)
indexes_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = indexes_utils.api_delete_index(self)
else:
self.index_id = 99099
response = indexes_utils.api_delete_index(self)
indexes_utils.assert_status_code(self, response)
indexes_utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@@ -26,11 +26,14 @@ from . import utils as indexes_utils
class IndexesDeleteMultipleTestCase(BaseTestGenerator):
"""This class will delete the existing index of column."""
scenarios = [
('Delete index Node URL', dict(url='/browser/index/obj/'))
]
# Get test cases
url = "/browser/index/obj/"
scenarios = utils.generate_scenarios("index_delete_multiple",
indexes_utils.test_cases)
def setUp(self):
""" This function will set up pre-requisite
creating index to delete."""
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@@ -82,16 +85,9 @@ class IndexesDeleteMultipleTestCase(BaseTestGenerator):
if not index_response:
raise Exception("Could not find the index to delete.")
data = {'ids': self.index_ids}
response = self.tester.delete(self.url + str(utils.SERVER_GROUP) +
'/' + str(self.server_id) + '/' +
str(self.db_id) + '/' +
str(self.schema_id) + '/' +
str(self.table_id) + '/',
data=json.dumps(data),
content_type='html/json',
follow_redirects=True)
self.assertEquals(response.status_code, 200)
if self.is_positive_test:
response = indexes_utils.api_delete_indexes(self, self.index_ids)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database

View File

@@ -8,6 +8,7 @@
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
@@ -24,12 +25,14 @@ from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
"""This class will fetch the existing index of column."""
scenarios = [
('Fetch index Node URL', dict(url='/browser/index/obj/'))
]
"""This class will get information about existing index/indexes"""
url = "/browser/index/obj/"
# Get list of test cases
scenarios = utils.generate_scenarios("index_get",
indexes_utils.test_cases)
def setUp(self):
"""Creating index/indexes """
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
@@ -62,15 +65,40 @@ class IndexesGetTestCase(BaseTestGenerator):
self.index_name,
self.column_name)
if self.is_list:
self.index_name_1 = "test_index_delete_%s" % \
(str(uuid.uuid4())[1:8])
self.index_ids = [self.index_id, indexes_utils.create_index(
self.server, self.db_name, self.schema_name, self.table_name,
self.index_name_1, self.column_name)]
def runTest(self):
"""This function will fetch the existing column index."""
response = self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
""" Function will do get api call using index id or
empty index id for list of indexes"""
if self.is_positive_test:
if self.is_list:
response = indexes_utils.api_get_index(self, "")
else:
response = indexes_utils.api_get_index(self, self.index_id)
indexes_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
if self.is_list:
response = indexes_utils.api_get_index(self, "")
else:
response = indexes_utils.api_get_index(self,
self.index_id)
else:
# Non-existing index id
self.index_id = 2341
response = indexes_utils.api_get_index(self, self.index_id)
indexes_utils.assert_status_code(self, response)
indexes_utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@@ -0,0 +1,80 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
""" This class will test dependent & dependencies for existing index"""
# Get list of test cases
url = "/browser/index/"
scenarios = utils.generate_scenarios("index_get_dependencies_dependents",
indexes_utils.test_cases)
def setUp(self):
""" Creating index required in actual tests"""
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:8])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:8])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
def runTest(self):
""" Calling dependent & dependency API calls for existing index"""
if self.is_positive_test:
if self.is_dependent:
response = indexes_utils. \
api_get_index_dependents(self, self.index_id)
else:
response = indexes_utils. \
api_get_index_dependency(self, self.index_id)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -0,0 +1,77 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
""" This class will test modified sql generated on modifying existing
index """
# Get list of test cases
url = "/browser/index/msql/"
scenarios = utils.generate_scenarios("index_get_index_msql",
indexes_utils.test_cases)
def setUp(self):
""" Creating index """
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:8])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:8])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
def runTest(self):
# """This function will check modified sql"""
if self.is_positive_test:
self.data = self.test_data
self.data['oid'] = self.index_id
response = indexes_utils.api_get_index_msql(self)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -0,0 +1,110 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
""" This class will test node api for existing index"""
# Get list of test cases
url = "/browser/index/nodes/"
scenarios = utils.generate_scenarios("index_get_nodes",
indexes_utils.test_cases)
def setUp(self):
""" Creating index required in further steps"""
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:8])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:8])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
if self.is_list:
self.index_name_1 = "test_index_delete_%s" % (
str(uuid.uuid4())[1:8])
self.index_ids = [self.index_id,
indexes_utils.create_index(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.index_name_1,
self.column_name)
]
def runTest(self):
# """This function will call method providing node details"""
if self.is_positive_test:
if self.is_list:
response = indexes_utils.api_get_index_node(self, "")
else:
response = indexes_utils. \
api_get_index_node(self, self.index_id)
indexes_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
if self.is_list:
response = indexes_utils.api_get_index_node(self, "")
else:
response = indexes_utils. \
api_get_index_node(self, self.index_id)
else:
self.index_id = 2341
response = indexes_utils. \
api_get_index_node(self, self.index_id)
indexes_utils.assert_status_code(self, response)
indexes_utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -0,0 +1,88 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
""" This class will test statistics API call for index"""
# Get list of test cases
url = "/browser/index/stats/"
scenarios = utils.generate_scenarios("index_get_statistics",
indexes_utils.test_cases)
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:8])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:8])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
def runTest(self):
""" Function calls API which provide statistics information
about existing index"""
if self.is_positive_test:
if self.is_list:
response = indexes_utils.api_get_index(self, "")
else:
response = indexes_utils.api_get_index(self, self.index_id)
indexes_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = indexes_utils.\
api_get_index_statistics(self, self.index_id)
indexes_utils.assert_status_code(self, response)
indexes_utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -9,6 +9,7 @@
import json
import uuid
from unittest.mock import patch
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
@@ -25,10 +26,9 @@ from . import utils as indexes_utils
class IndexesUpdateTestCase(BaseTestGenerator):
"""This class will update the existing index of column."""
scenarios = [
('Put index Node URL', dict(url='/browser/index/obj/'))
]
url = "/browser/index/obj/"
scenarios = utils.generate_scenarios("index_put",
indexes_utils.test_cases)
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
@@ -69,16 +69,20 @@ class IndexesUpdateTestCase(BaseTestGenerator):
self.index_name)
if not index_response:
raise Exception("Could not find the index to update.")
data = {"oid": self.index_id,
"description": "This is test comment for index"}
response = self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
data=json.dumps(data),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
self.data = self.test_data
self.data['oid'] = self.index_id
if self.is_positive_test:
response = indexes_utils.api_put_index(self)
indexes_utils.assert_status_code(self, response)
else:
if self.mocking_required:
with patch(self.mock_data["function_name"],
side_effect=[eval(self.mock_data["return_value"])]):
response = indexes_utils.api_put_index(self)
indexes_utils.assert_status_code(self, response)
indexes_utils.assert_error_message(self, response)
def tearDown(self):
# Disconnect the database

View File

@@ -0,0 +1,85 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import uuid
from pgadmin.browser.server_groups.servers.databases.schemas.tables.columns. \
tests import utils as columns_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tables.tests \
import utils as tables_utils
from pgadmin.browser.server_groups.servers.databases.schemas.tests import \
utils as schema_utils
from pgadmin.browser.server_groups.servers.databases.tests import utils as \
database_utils
from pgadmin.utils.route import BaseTestGenerator
from regression import parent_node_dict
from regression.python_test_utils import test_utils as utils
from . import utils as indexes_utils
class IndexesGetTestCase(BaseTestGenerator):
""" This class tests sql generated for existing index. """
# Get list of test cases
url = "/browser/index/sql/"
scenarios = utils.generate_scenarios("index_get_sql",
indexes_utils.test_cases)
def setUp(self):
self.db_name = parent_node_dict["database"][-1]["db_name"]
schema_info = parent_node_dict["schema"][-1]
self.server_id = schema_info["server_id"]
self.db_id = schema_info["db_id"]
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,
self.server_id, self.db_id)
if not db_con['data']["connected"]:
raise Exception("Could not connect to database to add a table.")
self.schema_id = schema_info["schema_id"]
self.schema_name = schema_info["schema_name"]
schema_response = schema_utils.verify_schemas(self.server,
self.db_name,
self.schema_name)
if not schema_response:
raise Exception("Could not find the schema to add a table.")
self.table_name = "table_column_%s" % (str(uuid.uuid4())[1:8])
self.table_id = tables_utils.create_table(self.server, self.db_name,
self.schema_name,
self.table_name)
self.column_name = "test_column_delete_%s" % (str(uuid.uuid4())[1:8])
self.column_id = columns_utils.create_column(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.column_name)
self.index_name = "test_index_delete_%s" % (str(uuid.uuid4())[1:8])
self.index_id = indexes_utils.create_index(self.server, self.db_name,
self.schema_name,
self.table_name,
self.index_name,
self.column_name)
if self.is_list:
self.index_name_1 = "test_index_delete_%s" % (
str(uuid.uuid4())[1:8])
self.index_ids = [self.index_id,
indexes_utils.create_index(self.server,
self.db_name,
self.schema_name,
self.table_name,
self.index_name_1,
self.column_name)
]
def runTest(self):
if self.is_positive_test:
response = indexes_utils.api_get_index_sql(self)
indexes_utils.assert_status_code(self, response)
def tearDown(self):
# Disconnect the database
database_utils.disconnect_database(self, self.server_id, self.db_id)

View File

@@ -8,12 +8,156 @@
##########################################################################
from __future__ import print_function
import os
import json
import sys
import traceback
from regression.python_test_utils import test_utils as utils
# Load test data from json file.
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
with open(CURRENT_PATH + "/index_test_data.json") as data_file:
test_cases = json.load(data_file)
def api_create_index(self):
return self.tester.post(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id),
data=json.dumps(self.data),
content_type='html/json')
def api_get_index(self, index_id):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
index_id))
def api_put_index(self):
return self.tester.put(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
data=json.dumps(self.data),
follow_redirects=True)
def api_get_index_statistics(self, index_id):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
str(index_id)))
def api_get_index_node(self, index_id):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
str(index_id)))
def api_delete_index(self):
return self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
follow_redirects=True
)
def api_delete_indexes(self, index_id_lists):
data = {'ids': index_id_lists}
return self.tester.delete(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id),
data=json.dumps(data),
content_type='html/json',
follow_redirects=True)
def api_get_index_sql(self):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id))
def api_create_index_get_collations(self):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id, ))
def api_create_index_get_access_methods(self):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id, ))
def api_create_index_get_op_class(self):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id, ))
def api_get_index_dependents(self, index_id):
self.url = self.url + 'dependent/'
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
str(index_id)))
def api_get_index_dependency(self, index_id):
self.url = self.url + 'dependency/'
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
str(index_id)))
def api_get_index_msql(self):
return self.tester.get(
"{0}{1}/{2}/{3}/{4}/{5}/{6}".format(self.url, utils.SERVER_GROUP,
self.server_id, self.db_id,
self.schema_id, self.table_id,
self.index_id),
data=json.dumps(self.data),
content_type='html/json',
follow_redirects=True
)
def assert_status_code(self, response):
act_res = response.status_code
exp_res = self.expected_data["status_code"]
return self.assertEquals(act_res, exp_res)
def assert_error_message(self, response, error_msg=None):
act_res = response.json["errormsg"]
if error_msg is not None:
exp_res = error_msg
else:
exp_res = self.expected_data["error_msg"]
return self.assertEquals(act_res, exp_res)
def create_index(server, db_name, schema_name, table_name, index_name,
col_name):