Include passed test results in the JSON output from the regression tests.

This commit is contained in:
Navnath Gadakh 2017-04-12 13:11:53 +01:00 committed by Dave Page
parent 7dd9efd811
commit 62716c4193
8 changed files with 123 additions and 45 deletions

View File

@ -22,6 +22,11 @@ if sys.version_info[0] >= 3:
class TestColumnForeignKeyGetConstraintCols(BaseTestGenerator):
scenarios = [
("Test foreign key get constraint with no foreign key properties on"
" the column", dict())
]
def runTest(self):
""" When there are no foreign key properties on the column, it returns an empty result """
with test_utils.Database(self.server) as (connection, database_name):

View File

@ -22,6 +22,11 @@ if sys.version_info[0] >= 3:
class TestTablesAcl(BaseTestGenerator):
scenarios = [
("Test query returns the permissions when there are permissions set up"
" on the table", dict())
]
def runTest(self):
""" This tests that when there are permissions set up on the table, acl query returns the permissions"""
with test_utils.Database(self.server) as (connection, database_name):

View File

@ -19,7 +19,13 @@ from regression.python_test_utils import test_utils
if sys.version_info[0] >= 3:
long = int
class TestTablesNode(BaseTestGenerator):
scenarios = [
("This scenario tests that all applicable sql template versions can "
"fetch table names", dict())
]
def runTest(self):
""" This tests that all applicable sql template versions can fetch table names """
with test_utils.Database(self.server) as (connection, database_name):

View File

@ -22,6 +22,11 @@ if sys.version_info[0] >= 3:
class TestTablesProperties(BaseTestGenerator):
scenarios = [
("This scenario tests that all applicable sql template versions can "
"fetch some ddl", dict())
]
def runTest(self):
""" This tests that all applicable sql template versions can fetch some ddl """
with test_utils.Database(self.server) as (connection, database_name):

View File

@ -8,6 +8,10 @@ class TestCheckRecovery(BaseTestGenerator):
versions_to_test = ["default", "9.0_plus"]
scenarios = [
("Test for check recovery", dict())
]
def runTest(self):
cursor = test_utils.get_db_connection(self.server['db'],

View File

@ -18,6 +18,10 @@ from pgadmin.utils.route import BaseTestGenerator
class TestVersionedTemplateLoader(BaseTestGenerator):
scenarios = [
("Test versioned template loader", dict())
]
def setUp(self):
self.loader = VersionedTemplateLoader(FakeApp())

View File

@ -139,6 +139,16 @@ def create_database(server, db_name):
def create_table(server, db_name, table_name):
"""
This function create the table in given database name
:param server: server details
:type server: dict
:param db_name: database name
:type db_name: str
:param table_name: table name
:type table_name: str
:return: None
"""
try:
connection = get_db_connection(db_name,
server['username'],
@ -149,7 +159,8 @@ def create_table(server, db_name, table_name):
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute(
'''CREATE TABLE "%s" (some_column VARCHAR, value NUMERIC)''' % table_name)
'''CREATE TABLE "%s" (some_column VARCHAR, value NUMERIC)''' %
table_name)
pg_cursor.execute(
'''INSERT INTO "%s" VALUES ('Some-Name', 6)''' % table_name)
connection.set_isolation_level(old_isolation_level)
@ -311,12 +322,14 @@ def delete_server_with_api(tester, sid):
def add_db_to_parent_node_dict(srv_id, db_id, test_db_name):
""" This function stores the database details into parent dict """
regression.parent_node_dict["database"].append({"server_id": srv_id,
"db_id": db_id,
"db_name": test_db_name})
def add_schema_to_parent_node_dict(srv_id, db_id, schema_id, schema_name):
""" This function stores the schema details into parent dict """
regression.parent_node_dict["schema"].append({"server_id": srv_id,
"db_id": db_id,
"schema_id": schema_id,
@ -350,6 +363,7 @@ def create_parent_server_node(server_info):
def delete_test_server(tester):
""" This function use to delete test server """
try:
parent_node_dict = regression.parent_node_dict
test_servers = parent_node_dict["server"]
@ -394,6 +408,7 @@ def delete_test_server(tester):
def get_db_password(config_servers, name, host, db_port):
""" This function return the password of particular server """
db_password = ''
for srv in config_servers:
if (srv['name'], srv['host'], srv['db_port']) == (name, host, db_port):
@ -402,6 +417,12 @@ def get_db_password(config_servers, name, host, db_port):
def get_db_server(sid):
"""
This function returns the SQLite database connection
:param sid: server id
:type sid: int
:return: db connection
"""
connection = ''
conn = sqlite3.connect(config.TEST_SQLITE_PATH)
cur = conn.cursor()
@ -505,6 +526,24 @@ def apply_scenario(scenario, test):
return newtest
# This method is overridden to catch passed test cases
def add_success(self, test):
"""
This function add the passed test cases in list i.e. TextTestResult.passed
:param self:TextTestResult class
:type self: TextTestResult object
:param test: test case
:type test: test case object
:return: None
"""
if self.showAll:
self.passed.append((test, "Passed"))
self.stream.writeln("ok")
elif self.dots:
self.stream.write('.')
self.stream.flush()
def get_scenario_name(cases):
"""
This function filters the test cases from list of test cases and returns

View File

@ -23,9 +23,9 @@ import json
from selenium import webdriver
if sys.version_info < (2, 7):
import unittest2 as unittest
import unittest2 as unit_test
else:
import unittest
import unittest as unit_test
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
@ -98,6 +98,10 @@ driver = None
app_starter = None
handle_cleanup = None
setattr(unit_test.result.TestResult, "passed", [])
unit_test.runner.TextTestResult.addSuccess = test_utils.add_success
# Override apply_scenario method as we need custom test description/name
scenarios.apply_scenario = test_utils.apply_scenario
@ -116,7 +120,7 @@ def get_suite(module_list, test_server, test_app_client):
:rtype: TestSuite
"""
modules = []
pgadmin_suite = unittest.TestSuite()
pgadmin_suite = unit_test.TestSuite()
# Get the each test module and add into list
for key, klass in module_list:
@ -207,49 +211,50 @@ def sig_handler(signo, frame):
handle_cleanup()
def update_test_result(test_cases, test_result_dict):
"""
This function update the test result in appropriate test behaviours i.e
passed/failed/skipped.
:param test_cases: test cases
:type test_cases: dict
:param test_result_dict: test result to be stored
:type test_result_dict: dict
:return: None
"""
for test_case in test_cases:
test_class_name = test_case[0].__class__.__name__
if test_class_name in test_result_dict:
test_result_dict[test_class_name].append(
{test_case[0].scenario_name: test_case[1]})
else:
test_result_dict[test_class_name] = \
[{test_case[0].scenario_name: test_case[
1]}]
def get_tests_result(test_suite):
"""This function returns the total ran and total failed test cases count"""
try:
total_ran = test_suite.testsRun
passed_cases_result = {}
failed_cases_result = {}
skipped_cases_result = {}
if total_ran:
if test_suite.failures:
for failed_case in test_suite.failures:
if hasattr(failed_case[0], "scenario_name"):
class_name = str(
failed_case[0]).split('.')[-1].split()[0].strip(
')')
if class_name in failed_cases_result:
failed_cases_result[class_name].append(
{failed_case[0].scenario_name: failed_case[1]})
else:
failed_cases_result[class_name] = \
[{failed_case[0].scenario_name: failed_case[
1]}]
if test_suite.errors:
for error_case in test_suite.errors:
if hasattr(error_case[0], "scenario_name"):
class_name = str(
error_case[0]).split('.')[-1].split()[0].strip(')')
if class_name in failed_cases_result:
failed_cases_result[class_name].append(
{error_case[0].scenario_name: error_case[1]})
else:
failed_cases_result[class_name] = \
[{error_case[0].scenario_name: error_case[1]}]
if test_suite.skipped:
for skip_test in test_suite.skipped:
# if hasattr(skip_test[0], "scenario_name"):
class_name = str(
skip_test[0]).split('.')[-1].split()[0].strip(')')
if class_name in skipped_cases_result:
skipped_cases_result[class_name].append(
{skip_test[0].scenario_name: skip_test[1]})
else:
skipped_cases_result[class_name] = \
[{skip_test[0].scenario_name: skip_test[1]}]
return total_ran, failed_cases_result, skipped_cases_result
passed = test_suite.passed
failures = test_suite.failures
errors = test_suite.errors
skipped = test_suite.skipped
if passed:
update_test_result(passed, passed_cases_result)
if failures:
update_test_result(failures, failed_cases_result)
if errors:
update_test_result(errors, failed_cases_result)
if skipped:
update_test_result(skipped, skipped_cases_result)
return total_ran, failed_cases_result, skipped_cases_result, \
passed_cases_result
except Exception:
traceback.print_exc(file=sys.stderr)
@ -321,14 +326,18 @@ if __name__ == '__main__':
test_utils.create_parent_server_node(server)
suite = get_suite(test_module_list, server, test_client)
tests = unittest.TextTestRunner(stream=sys.stderr,
tests = unit_test.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
ran_tests, failed_cases, skipped_cases = \
ran_tests, failed_cases, skipped_cases, passed_cases = \
get_tests_result(tests)
test_result[server['name']] = [ran_tests, failed_cases,
skipped_cases]
skipped_cases, passed_cases]
# Set empty list for 'passed' parameter for each testRun.
# So that it will not append same test case name
unit_test.result.TestResult.passed = []
if len(failed_cases) > 0:
failure = True
@ -350,6 +359,7 @@ if __name__ == '__main__':
for server_res in test_result:
failed_cases = test_result[server_res][1]
skipped_cases = test_result[server_res][2]
passed_cases = test_result[server_res][3]
skipped_cases, skipped_cases_json = test_utils.get_scenario_name(
skipped_cases)
failed_cases, failed_cases_json = test_utils.get_scenario_name(
@ -378,7 +388,7 @@ if __name__ == '__main__':
file=sys.stderr)
temp_dict_for_server = {
server_res: {"tests_passed": total_passed_cases,
server_res: {"tests_passed": [total_passed_cases, passed_cases],
"tests_failed": [total_failed, failed_cases_json],
"tests_skipped": [total_skipped, skipped_cases_json]
}