Add a framework for testing reversed engineered SQL and CRUD API endpoints. Fixes #4202

This commit is contained in:
Akshay Joshi 2019-06-18 14:28:25 +01:00 committed by Dave Page
parent 71919a4ce9
commit 9e65c971a3
14 changed files with 359 additions and 15 deletions

View File

@ -43,6 +43,9 @@ check-pep8:
check-python:
cd web && python regression/runtests.py --exclude feature_tests
check-resql:
cd web && python regression/runtests.py --pkg resql --exclude feature_tests
check-feature: install-node bundle
cd web && python regression/runtests.py --pkg feature_tests

View File

@ -10,6 +10,7 @@ New features
************
| `Feature #3174 <https://redmine.postgresql.org/issues/3174>`_ - Visually distinguish simple tables from tables that are inherited and from which other tables are inherited.
| `Feature #4202 <https://redmine.postgresql.org/issues/4202>`_ - Add a framework for testing reversed engineered SQL and CRUD API endpoints.
Bug fixes
*********

View File

@ -0,0 +1,9 @@
-- Cast: money -> bigint
-- DROP CAST (money AS bigint);
CREATE CAST (money AS bigint)
WITHOUT FUNCTION
AS IMPLICIT;
COMMENT ON CAST (money AS bigint) IS 'Cast from money to bigint';

View File

@ -0,0 +1,7 @@
-- Cast: money -> bigint
-- DROP CAST (money AS bigint);
CREATE CAST (money AS bigint)
WITHOUT FUNCTION
AS IMPLICIT;

View File

@ -0,0 +1,68 @@
{
"scenarios": [
{
"type": "create",
"name": "Create IMPLICIT Cast",
"endpoint": "NODE-cast.obj",
"sql_endpoint": "NODE-cast.sql_id",
"data": {
"castcontext": "IMPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"expected_sql_file": "create_implicit_cast.sql"
},
{
"type": "alter",
"name": "Alter IMPLICIT Cast",
"endpoint": "NODE-cast.obj_id",
"sql_endpoint": "NODE-cast.sql_id",
"data": {
"description": "Cast from money to bigint"
},
"expected_sql_file": "alter_implicit_cast.sql"
},
{
"type": "delete",
"name": "Drop IMPLICIT Cast",
"endpoint": "NODE-cast.delete_id",
"data": {
"name": "money->bigint"
}
},
{
"type": "create",
"name":"Create EXPLICIT Cast",
"endpoint": "NODE-cast.obj",
"sql_endpoint": "NODE-cast.sql_id",
"data": {
"castcontext": "EXPLICIT",
"encoding": "UTF8",
"name": "money->bigint",
"srctyp": "money",
"trgtyp": "bigint"
},
"expected_sql": "-- Cast: money -> bigint\n\n-- DROP CAST (money AS bigint);\n\nCREATE CAST (money AS bigint)\n\tWITHOUT FUNCTION;"
},
{
"type": "alter",
"name": "Alter EXPLICIT Cast",
"endpoint": "NODE-cast.obj_id",
"sql_endpoint": "NODE-cast.sql_id",
"data": {
"description": "Cast from money to bigint"
},
"expected_sql": "-- Cast: money -> bigint\n\n-- DROP CAST (money AS bigint);\n\nCREATE CAST (money AS bigint)\n\tWITHOUT FUNCTION;\n\nCOMMENT ON CAST (money AS bigint) IS 'Cast from money to bigint';"
},
{
"type": "delete",
"name": "Drop EXPLICIT Cast",
"endpoint": "NODE-cast.delete_id",
"data": {
"name": "money->bigint"
}
}
]
}

View File

@ -11,6 +11,5 @@ from pgadmin.utils.route import BaseTestGenerator
class CollationTestGenerator(BaseTestGenerator):
def generate_tests(self):
def runTest(self):
return

View File

@ -11,6 +11,5 @@ from pgadmin.utils.route import BaseTestGenerator
class SynonymTestGenerator(BaseTestGenerator):
def generate_tests(self):
def runTest(self):
return

View File

@ -11,6 +11,5 @@ from pgadmin.utils.route import BaseTestGenerator
class ViewsTestGenerator(BaseTestGenerator):
def generate_tests(self):
def runTest(self):
return

View File

@ -53,13 +53,16 @@ class TestsGeneratorRegistry(ABCMeta):
ABCMeta.__init__(cls, name, bases, d)
@classmethod
def load_generators(cls, pkg_root, exclude_pkgs, for_modules=[]):
def load_generators(cls, pkg_root, exclude_pkgs, for_modules=[],
is_resql_only=False):
cls.registry = dict()
all_modules = []
all_modules += find_modules(pkg_root, False, True)
# Append reverse engineered test case module
all_modules.append('regression.re_sql.tests.test_resql')
# If specific modules are to be tested, exclude others
if len(for_modules) > 0:
@ -68,17 +71,30 @@ class TestsGeneratorRegistry(ABCMeta):
for fmod in for_modules
if module_name.endswith(fmod)]
# Check for SERVER mode
for module_name in all_modules:
# Set the module list and exclude packages in the BaseTestGenerator
# for Reverse Engineer SQL test cases.
BaseTestGenerator.setReSQLModuleList(all_modules)
BaseTestGenerator.setExcludePkgs(exclude_pkgs)
# Check if only reverse engineered sql test cases to run
# if yes then import only that module
if is_resql_only:
try:
if "tests." in str(module_name) and not any(
str(module_name).startswith(
'pgadmin.' + str(exclude_pkg)
) for exclude_pkg in exclude_pkgs
):
import_module(module_name)
import_module('regression.re_sql.tests.test_resql')
except ImportError:
traceback.print_exc(file=sys.stderr)
else:
# Check for SERVER mode
for module_name in all_modules:
try:
if "tests." in str(module_name) and not any(
str(module_name).startswith(
'pgadmin.' + str(exclude_pkg)
) for exclude_pkg in exclude_pkgs
):
import_module(module_name)
except ImportError:
traceback.print_exc(file=sys.stderr)
@six.add_metaclass(TestsGeneratorRegistry)
@ -123,3 +139,11 @@ class BaseTestGenerator(unittest.TestCase):
@classmethod
def setTestDatabaseName(cls, database_name):
cls.test_db = database_name
@classmethod
def setReSQLModuleList(cls, module_list):
cls.re_sql_module_list = module_list
@classmethod
def setExcludePkgs(cls, exclude_pkgs):
cls.exclude_pkgs = exclude_pkgs

View File

@ -148,6 +148,9 @@ Python Tests:
run 'python runtests.py --pkg all' or just:
'python runtests.py'
- Execute only reverse engineered sql test framework for all nodes
run 'python runtests.py --pkg resql'
- Execute test framework for entire package
Example 1) Run test framework for 'browser' package

View File

View File

View File

@ -0,0 +1,225 @@
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2019, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
import json
import os
from flask import url_for
from pgadmin.utils.route import BaseTestGenerator
from regression.python_test_utils import test_utils as utils
from pgadmin.browser.server_groups.servers.databases.tests import \
utils as database_utils
from pgadmin.utils.versioned_template_loader import \
get_version_mapping_directories
def create_resql_module_list(all_modules, exclude_pkgs):
"""
This function is used to create the module list for reverse engineering
SQL by iterating all the modules.
:param all_modules: List of all the modules
:param exclude_pkgs: List of exclude packages
:return:
"""
resql_module_list = dict()
for module in all_modules:
if "tests." in str(module) and not any(str(module).startswith(
'pgadmin.' + str(exclude_pkg)) for exclude_pkg in exclude_pkgs
):
complete_module_name = module.split(".test")
module_name_list = complete_module_name[0].split(".")
module_name = module_name_list[len(module_name_list) - 1]
resql_module_list[module_name] = os.path.join(*module_name_list)
return resql_module_list
class ReverseEngineeredSQLTestCases(BaseTestGenerator):
""" This class will test the reverse engineering SQL"""
scenarios = [
('Reverse Engineered SQL Test Cases', dict())
]
def setUp(self):
# Get the database connection
self.db_con = database_utils.connect_database(
self, utils.SERVER_GROUP, self.server_information['server_id'],
self.server_information['db_id'])
if not self.db_con['info'] == "Database connected.":
raise Exception("Could not connect to database.")
# Get the application path
self.apppath = os.getcwd()
def runTest(self):
# Create the module list on which reverse engineering sql test
# cases will be executed.
resql_module_list = create_resql_module_list(
BaseTestGenerator.re_sql_module_list,
BaseTestGenerator.exclude_pkgs)
for module in resql_module_list:
module_path = resql_module_list[module]
# Get the folder name based on server version number and
# their existence.
status, self.test_folder = self.get_test_folder(module_path)
if not status:
continue
# Iterate all the files in the test folder and check for
# the JSON files.
for filename in os.listdir(self.test_folder):
if filename.endswith(".json"):
complete_file_name = os.path.join(self.test_folder,
filename)
with open(complete_file_name) as jsonfp:
data = json.load(jsonfp)
for key, scenarios in data.items():
self.execute_test_case(scenarios)
def tearDown(self):
database_utils.disconnect_database(
self, self.server_information['server_id'],
self.server_information['db_id'])
def get_url(self, endpoint, object_id=None):
"""
This function is used to get the url.
:param endpoint:
:param object_id:
:return:
"""
object_url = None
for rule in self.app.url_map.iter_rules(endpoint):
options = {}
for arg in rule.arguments:
if arg == 'gid':
options['gid'] = int(utils.SERVER_GROUP)
elif arg == 'sid':
options['sid'] = int(self.server_information['server_id'])
elif arg == 'did':
options['did'] = int(self.server_information['db_id'])
elif arg == 'scid':
options['scid'] = int(self.server_information['schema_id'])
else:
if object_id is not None:
options[arg] = int(object_id)
with self.app.test_request_context():
object_url = url_for(rule.endpoint, **options)
return object_url
def execute_test_case(self, scenarios):
"""
This function will run the test cases for specific module.
:param module_name: Name of the module
:param scenarios: List of scenarios
:return:
"""
object_id = None
# Added line break after scenario name
print("\n")
for scenario in scenarios:
print(scenario['name'])
if 'type' in scenario and scenario['type'] == 'create':
# Get the url and create the specific node.
create_url = self.get_url(scenario['endpoint'])
response = self.tester.post(create_url,
data=json.dumps(scenario['data']),
content_type='html/json')
self.assertEquals(response.status_code, 200)
resp_data = json.loads(response.data.decode('utf8'))
object_id = resp_data['node']['_id']
# Compare the reverse engineering SQL
self.check_re_sql(scenario, object_id)
elif 'type' in scenario and scenario['type'] == 'alter':
# Get the url and create the specific node.
alter_url = self.get_url(scenario['endpoint'], object_id)
response = self.tester.put(alter_url,
data=json.dumps(scenario['data']),
follow_redirects=True)
self.assertEquals(response.status_code, 200)
resp_data = json.loads(response.data.decode('utf8'))
object_id = resp_data['node']['_id']
# Compare the reverse engineering SQL
self.check_re_sql(scenario, object_id)
elif 'type' in scenario and scenario['type'] == 'delete':
# Get the delete url and delete the object created above.
delete_url = self.get_url(scenario['endpoint'], object_id)
delete_response = self.tester.delete(delete_url,
follow_redirects=True)
self.assertEquals(delete_response.status_code, 200)
def get_test_folder(self, module_path):
"""
This function will get the appropriate test folder based on
server version and their existence.
:param module_path: Path of the module to be tested.
:return:
"""
# Join the application path and the module path
absolute_path = os.path.join(self.apppath, module_path)
# Iterate the version mapping directories.
for version_mapping in get_version_mapping_directories(
self.server['type']):
if version_mapping['number'] > \
self.server_information['server_version']:
continue
complete_path = os.path.join(absolute_path, 'tests',
version_mapping['name'])
if os.path.exists(complete_path):
return True, complete_path
return False, None
def check_re_sql(self, scenario, object_id):
"""
This function is used to get the reverse engineering SQL.
:param scenario:
:param object_id:
:return:
"""
sql_url = self.get_url(scenario['sql_endpoint'], object_id)
response = self.tester.get(sql_url)
self.assertEquals(response.status_code, 200)
resp_sql = response.data.decode('unicode_escape')
# Remove first and last double quotes
if resp_sql.startswith('"') and resp_sql.endswith('"'):
resp_sql = resp_sql[1:-1]
# Check if expected sql is given in JSON file or path of the output
# file is given
if 'expected_sql_file' in scenario:
output_file = os.path.join(self.test_folder,
scenario['expected_sql_file'])
if os.path.exists(output_file):
fp = open(output_file, "r")
# Used rstrip to remove trailing \n
sql = fp.read().rstrip()
self.assertEquals(sql, resp_sql)
else:
self.assertFalse("Expected SQL File not found")
elif 'expected_sql' in scenario:
self.assertEquals(scenario['expected_sql'], resp_sql)

View File

@ -249,6 +249,10 @@ def get_test_modules(arguments):
# Load the test modules which are in given package(i.e. in arguments.pkg)
if arguments['pkg'] is None or arguments['pkg'] == "all":
TestsGeneratorRegistry.load_generators('pgadmin', exclude_pkgs)
elif arguments['pkg'] is not None and arguments['pkg'] == "resql":
# Load the reverse engineering sql test module
TestsGeneratorRegistry.load_generators('pgadmin', exclude_pkgs,
is_resql_only=True)
else:
for_modules = []
if arguments['modules'] is not None:
@ -438,6 +442,9 @@ if __name__ == '__main__':
server['sslmode']
)
# Add the server version in server information
server_information['server_version'] = connection.server_version
# Drop the database if already exists.
test_utils.drop_database(connection, test_db_name)
# Create database