Write a JSON file when regression tests run, listing results.

This commit is contained in:
Navnath Gadakh 2017-04-06 09:33:29 +01:00 committed by Dave Page
parent b8566a0127
commit dd23f71478
5 changed files with 138 additions and 44 deletions

View File

@ -18,6 +18,9 @@ class ConnectsToServerFeatureTest(BaseFeatureTest):
"""
Tests that a database connection can be created from the UI
"""
scenarios = [
("Test database connection", dict())
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
@ -30,6 +33,8 @@ class ConnectsToServerFeatureTest(BaseFeatureTest):
test_utils.create_table(self.server, "acceptance_test_db", "test_table")
def runTest(self):
"""This function tests that a database connection can be created from
the UI"""
self.assertEqual(app_config.APP_NAME, self.page.driver.title)
self.page.wait_for_spinner_to_disappear()

View File

@ -12,6 +12,12 @@ from regression.python_test_utils import test_utils
class TableDdlFeatureTest(BaseFeatureTest):
""" This class test acceptance test scenarios """
scenarios = [
("Test table DDL generation", dict())
]
def before(self):
connection = test_utils.get_db_connection(self.server['db'],
self.server['username'],

View File

@ -3,4 +3,5 @@ regression.log
test_greenplum_config.json
test_advanced_config.json
test_config.json
test_result.json
screenshots/

View File

@ -15,6 +15,7 @@ import uuid
import psycopg2
import sqlite3
from functools import partial
from testtools.testcase import clone_test_with_new_id
import config
import regression
@ -48,9 +49,10 @@ def login_tester_account(tester):
tester.post('/login', data=dict(email=email, password=password),
follow_redirects=True)
else:
from regression.runtests import app_starter
print("Unable to login test client, email and password not found.",
file=sys.stderr)
_drop_objects(tester)
_cleanup(tester, app_starter)
sys.exit(1)
@ -154,25 +156,6 @@ def create_table(server, db_name, table_name):
traceback.print_exc(file=sys.stderr)
def create_table(server, db_name, table_name):
try:
connection = get_db_connection(db_name,
server['username'],
server['db_password'],
server['host'],
server['port'])
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''CREATE TABLE "%s" (some_column VARCHAR, value NUMERIC)''' % table_name)
pg_cursor.execute('''INSERT INTO "%s" VALUES ('Some-Name', 6)''' % table_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
except Exception:
traceback.print_exc(file=sys.stderr)
def drop_database(connection, database_name):
"""This function used to drop the database"""
if database_name not in ["postgres", "template1", "template0"]:
@ -419,6 +402,56 @@ def get_cleanup_handler(tester, app_starter):
return partial(_cleanup, tester, app_starter)
def apply_scenario(scenario, test):
"""Apply scenario to test.
:param scenario: A tuple (name, parameters) to apply to the test. The test
is cloned, its id adjusted to have (name) after it, and the parameters
dict is used to update the new test.
:param test: The test to apply the scenario to. This test is unaltered.
:return: A new test cloned from test, with the scenario applied.
"""
name, parameters = scenario
parameters["scenario_name"] = name
scenario_suffix = '(' + name + ')'
newtest = clone_test_with_new_id(test,
test.id() + scenario_suffix)
# Replace test description with test scenario name
test_desc = name
if test_desc is not None:
newtest_desc = test_desc
newtest.shortDescription = (lambda: newtest_desc)
for key, value in parameters.items():
setattr(newtest, key, value)
return newtest
def get_scenario_name(cases):
"""
This function filters the test cases from list of test cases and returns
the test cases list
:param cases: test cases
:type cases: dict
:return: test cases in dict
:rtype: dict
"""
test_cases_dict = {}
test_cases_dict_json = {}
for class_name, test_case_list in cases.items():
result = {class_name: []}
for case_name_dict in test_case_list:
key, value = list(case_name_dict.items())[0]
if key not in {c_name for scenario in result[class_name] for
c_name in
scenario.keys()}:
result[class_name].append(case_name_dict)
test_cases_dict_json.update(result)
test_cases_dict.update({class_name: list({case for test_case in
test_case_list
for case in test_case})})
return test_cases_dict, test_cases_dict_json
class Database:
"""
Temporarily create and connect to a database, tear it down at exit

View File

@ -18,6 +18,7 @@ import os
import signal
import sys
import traceback
import json
from selenium import webdriver
@ -29,7 +30,7 @@ else:
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
from testscenarios.scenarios import generate_scenarios
from testscenarios import scenarios
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
@ -51,8 +52,8 @@ if os.path.isfile(config.TEST_SQLITE_PATH):
config.TESTING_MODE = True
# Disable upgrade checks - no need during testing, and it'll cause an error if there's
# no network connection when it runs.
# Disable upgrade checks - no need during testing, and it'll cause an error
# if there's no network connection when it runs.
config.UPGRADE_CHECK_ENABLED = False
pgadmin_credentials = test_setup.config_data
@ -72,7 +73,7 @@ if pgadmin_credentials:
'login_password']
# Execute the setup file
exec (open("setup.py").read())
exec(open("setup.py").read())
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
@ -97,6 +98,10 @@ driver = None
app_starter = None
handle_cleanup = None
# Override apply_scenario method as we need custom test description/name
scenarios.apply_scenario = test_utils.apply_scenario
def get_suite(module_list, test_server, test_app_client):
"""
This function add the tests to test suite and return modified test suite
@ -125,7 +130,7 @@ def get_suite(module_list, test_server, test_app_client):
obj.setTestClient(test_app_client)
obj.setTestServer(test_server)
obj.setDriver(driver)
scenario = generate_scenarios(obj)
scenario = scenarios.generate_scenarios(obj)
pgadmin_suite.addTests(scenario)
return pgadmin_suite
@ -138,7 +143,7 @@ def get_test_modules(arguments):
:param arguments: this is command line arguments for module name to
which test suite will run
:type arguments: str
:type arguments: dict
:return module list: test module list
:rtype: list
"""
@ -197,6 +202,7 @@ def add_arguments():
def sig_handler(signo, frame):
global handle_cleanup
if handle_cleanup:
handle_cleanup()
@ -205,26 +211,44 @@ def get_tests_result(test_suite):
"""This function returns the total ran and total failed test cases count"""
try:
total_ran = test_suite.testsRun
failed_cases_result = []
skipped_cases_result = []
failed_cases_result = {}
skipped_cases_result = {}
if total_ran:
if test_suite.failures:
for failed_case in test_suite.failures:
class_name = str(
failed_case[0]).split('.')[-1].split()[0].strip(')')
failed_cases_result.append(class_name)
if hasattr(failed_case[0], "scenario_name"):
class_name = str(
failed_case[0]).split('.')[-1].split()[0].strip(
')')
if class_name in failed_cases_result:
failed_cases_result[class_name].append(
{failed_case[0].scenario_name: failed_case[1]})
else:
failed_cases_result[class_name] = \
[{failed_case[0].scenario_name: failed_case[
1]}]
if test_suite.errors:
for error_case in test_suite.errors:
class_name = str(
error_case[0]).split('.')[-1].split()[0].strip(')')
if class_name not in failed_cases_result:
failed_cases_result.append(class_name)
if hasattr(error_case[0], "scenario_name"):
class_name = str(
error_case[0]).split('.')[-1].split()[0].strip(')')
if class_name in failed_cases_result:
failed_cases_result[class_name].append(
{error_case[0].scenario_name: error_case[1]})
else:
failed_cases_result[class_name] = \
[{error_case[0].scenario_name: error_case[1]}]
if test_suite.skipped:
for skip_test in test_suite.skipped:
# if hasattr(skip_test[0], "scenario_name"):
class_name = str(
skip_test[0]).split('.')[-1].split()[0].strip(')')
if class_name not in failed_cases_result:
skipped_cases_result.append(class_name)
if class_name in skipped_cases_result:
skipped_cases_result[class_name].append(
{skip_test[0].scenario_name: skip_test[1]})
else:
skipped_cases_result[class_name] = \
[{skip_test[0].scenario_name: skip_test[1]}]
return total_ran, failed_cases_result, skipped_cases_result
except Exception:
traceback.print_exc(file=sys.stderr)
@ -257,8 +281,8 @@ class StreamToLogger(object):
if __name__ == '__main__':
# Failure detected?
failure = False
test_result = dict()
# Set signal handler for cleanup
signal_list = dir(signal)
required_signal_list = ['SIGTERM', 'SIGABRT', 'SIGQUIT', 'SIGINT']
@ -321,11 +345,19 @@ if __name__ == '__main__':
print(
"==================================================================="
"===\n", file=sys.stderr)
test_result_json = {}
for server_res in test_result:
failed_cases = "\n\t\t".join(test_result[server_res][1])
skipped_cases = "\n\t\t".join(test_result[server_res][2])
total_failed = len(test_result[server_res][1])
total_skipped = len(test_result[server_res][2])
failed_cases = test_result[server_res][1]
skipped_cases = test_result[server_res][2]
skipped_cases, skipped_cases_json = test_utils.get_scenario_name(
skipped_cases)
failed_cases, failed_cases_json = test_utils.get_scenario_name(
failed_cases)
total_failed = sum({key: len(value) for key, value in
failed_cases.items()}.values())
total_skipped = sum({key: len(value) for key, value in
skipped_cases.items()}.values())
total_passed_cases = int(
test_result[server_res][0]) - total_failed - total_skipped
@ -335,11 +367,28 @@ if __name__ == '__main__':
(server_res, total_passed_cases,
(total_passed_cases != 1 and "s" or ""),
total_failed, (total_failed != 1 and "s" or ""),
(total_failed != 0 and ":\n\t\t" or ""), failed_cases,
(total_failed != 0 and ":\n\t\t" or ""),
"\n\t\t".join("{} ({})".format(k, ",\n\t\t\t\t\t".join(
map(str, v))) for k, v in failed_cases.items()),
total_skipped, (total_skipped != 1 and "s" or ""),
(total_skipped != 0 and ":\n\t\t" or ""), skipped_cases),
(total_skipped != 0 and ":\n\t\t" or ""),
"\n\t\t".join("{} ({})".format(k, ",\n\t\t\t\t\t".join(
map(str, v))) for k, v in skipped_cases.items())),
file=sys.stderr)
temp_dict_for_server = {
server_res: {"tests_passed": total_passed_cases,
"tests_failed": [total_failed, failed_cases_json],
"tests_skipped": [total_skipped, skipped_cases_json]
}
}
test_result_json.update(temp_dict_for_server)
# Dump test result into json file
json_file_path = CURRENT_PATH + "/tests_result.json"
with open(json_file_path, 'w') as outfile:
json.dump(test_result_json, outfile, indent=2)
print(
"==================================================================="
"===\n", file=sys.stderr)