pgadmin4/web/regression/runtests.py

858 lines
31 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2020, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
""" This file collect all modules/files present in tests directory and add
them to TestSuite. """
from __future__ import print_function
import argparse
import atexit
import logging
import os
import signal
import sys
import traceback
import json
import random
import coverage
import threading
import time
import unittest
if sys.version_info < (3, 4):
raise Exception('The test suite must be run under Python 3.4 or later.')
import builtins
# Ensure the global server mode is set.
builtins.SERVER_MODE = None
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
from testscenarios import scenarios
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
# Set sys path to current directory so that we can import pgadmin package
root = os.path.dirname(CURRENT_PATH)
if sys.path[0] != root:
sys.path.insert(0, root)
os.chdir(root)
from pgadmin import create_app
import config
COVERAGE_CONFIG_FILE = os.path.join(CURRENT_PATH, ".coveragerc")
if config.SERVER_MODE is True:
config.SECURITY_RECOVERABLE = True
config.SECURITY_CHANGEABLE = True
config.SECURITY_POST_CHANGE_VIEW = 'browser.change_password'
# disable master password for test cases
config.MASTER_PASSWORD_REQUIRED = False
from regression import test_setup
from regression.feature_utils.app_starter import AppStarter
# Delete SQLite db file if exists
if os.path.isfile(config.TEST_SQLITE_PATH):
os.remove(config.TEST_SQLITE_PATH)
os.environ["PGADMIN_TESTING_MODE"] = "1"
# Disable upgrade checks - no need during testing, and it'll cause an error
# if there's no network connection when it runs.
config.UPGRADE_CHECK_ENABLED = False
pgadmin_credentials = test_setup.config_data
# Set environment variables for email and password
os.environ['PGADMIN_SETUP_EMAIL'] = ''
os.environ['PGADMIN_SETUP_PASSWORD'] = ''
if pgadmin_credentials:
if 'pgAdmin4_login_credentials' in pgadmin_credentials:
if all(item in pgadmin_credentials['pgAdmin4_login_credentials']
for item in ['login_username', 'login_password']):
pgadmin_credentials = pgadmin_credentials[
'pgAdmin4_login_credentials']
os.environ['PGADMIN_SETUP_EMAIL'] = str(pgadmin_credentials[
'login_username'])
os.environ['PGADMIN_SETUP_PASSWORD'] = str(pgadmin_credentials[
'login_password'])
# Execute the setup file
exec(open("setup.py").read())
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
from pgadmin.model import SCHEMA_VERSION
# Delay the import test_utils as it needs updated config.SQLITE_PATH
from regression.python_test_utils import test_utils
from regression.python_test_utils.csrf_test_client import TestClient
config.SETTINGS_SCHEMA_VERSION = SCHEMA_VERSION
# Override some other defaults
from logging import WARNING
config.CONSOLE_LOG_LEVEL = WARNING
# Create the app
app = create_app()
app.PGADMIN_INT_KEY = ''
app.config.update({'SESSION_COOKIE_DOMAIN': None})
driver = None
app_starter = None
handle_cleanup = None
app.PGADMIN_RUNTIME = True
if config.SERVER_MODE is True:
app.PGADMIN_RUNTIME = False
app.config['WTF_CSRF_ENABLED'] = True
# Authentication sources
app.PGADMIN_DEFAULT_AUTH_SOURCE = 'internal'
app.PGADMIN_EXTERNAL_AUTH_SOURCE = 'ldap'
app.test_client_class = TestClient
test_client = app.test_client()
test_client.setApp(app)
setattr(unittest.result.TestResult, "passed", [])
unittest.runner.TextTestResult.addSuccess = test_utils.add_success
# Override apply_scenario method as we need custom test description/name
scenarios.apply_scenario = test_utils.apply_scenario
def get_suite(module_list, test_server, test_app_client, server_information,
test_db_name, driver_passed):
"""
This function add the tests to test suite and return modified test suite
variable.
:param server_information:
:param module_list: test module list
:type module_list: list
:param test_server: server details
:type test_server: dict
:param test_app_client: test client
:type test_app_client: pgadmin app object
:return pgadmin_suite: test suite with test cases
:rtype: TestSuite
"""
modules = []
pgadmin_suite = unittest.TestSuite()
# Get the each test module and add into list
for key, klass in module_list:
# Separate each test class from list of classes and store in modules
for item in klass:
gen = item
modules.append(gen)
# Set the test client to each module & generate the scenarios
for module in modules:
obj = module()
obj.setApp(app)
obj.setTestClient(test_app_client)
obj.setTestServer(test_server)
obj.setDriver(driver_passed)
obj.setServerInformation(server_information)
obj.setTestDatabaseName(test_db_name)
scenario = scenarios.generate_scenarios(obj)
pgadmin_suite.addTests(scenario)
return pgadmin_suite
def get_test_modules(arguments):
"""
This function loads the all modules in the tests directory into testing
environment.
:param arguments: this is command line arguments for module name to
which test suite will run
:type arguments: dict
:return module list: test module list
:rtype: list
"""
from pgadmin.utils.route import TestsGeneratorRegistry
exclude_pkgs = []
global driver, app_starter, handle_cleanup
if not config.SERVER_MODE:
# following test cases applicable only for server mode
exclude_pkgs.extend([
"browser.tests.test_change_password",
"browser.tests.test_gravatar_image_display",
"browser.tests.test_login",
"browser.tests.test_logout",
"browser.tests.test_reset_password",
"browser.tests.test_ldap_login",
"browser.tests.test_ldap_with_mocking",
])
if arguments['exclude'] is not None:
exclude_pkgs += arguments['exclude'].split(',')
if 'feature_tests' not in exclude_pkgs and \
(arguments['pkg'] is None or arguments['pkg'] == "all" or
arguments['pkg'] == "feature_tests"):
if arguments['pkg'] == "feature_tests":
exclude_pkgs.extend(['resql'])
if not test_utils.is_parallel_ui_tests(args):
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.desired_capabilities import \
DesiredCapabilities
default_browser = 'chrome'
# Check default browser provided through command line. If provided
# then use that browser as default browser else check for the
# setting provided in test_config.json file.
if (
'default_browser' in arguments and
arguments['default_browser'] is not None
):
default_browser = arguments['default_browser'].lower()
elif (
test_setup.config_data and
"default_browser" in test_setup.config_data
):
default_browser = test_setup.config_data[
'default_browser'].lower()
if default_browser == 'firefox':
cap = DesiredCapabilities.FIREFOX
cap['requireWindowFocus'] = True
cap['enablePersistentHover'] = False
profile = webdriver.FirefoxProfile()
profile.set_preference("dom.disable_beforeunload", True)
driver = webdriver.Firefox(capabilities=cap,
firefox_profile=profile)
driver.implicitly_wait(1)
else:
options = Options()
if test_setup.config_data:
if 'headless_chrome' in test_setup.config_data:
if test_setup.config_data['headless_chrome']:
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-setuid-sandbox")
options.add_argument("--window-size=1280,1024")
options.add_argument("--disable-infobars")
options.add_experimental_option('w3c', False)
driver = webdriver.Chrome(chrome_options=options)
# maximize browser window
driver.maximize_window()
app_starter = AppStarter(driver, config)
app_starter.start_app()
handle_cleanup = test_utils.get_cleanup_handler(test_client, app_starter)
# Register cleanup function to cleanup on exit
atexit.register(handle_cleanup)
# Load the test modules which are in given package(i.e. in arguments.pkg)
if arguments['pkg'] is None or arguments['pkg'] == "all":
TestsGeneratorRegistry.load_generators('pgadmin', exclude_pkgs)
elif arguments['pkg'] is not None and arguments['pkg'] == "resql":
for_modules = []
if arguments['modules'] is not None:
for_modules = arguments['modules'].split(',')
# Load the reverse engineering sql test module
TestsGeneratorRegistry.load_generators('pgadmin', exclude_pkgs,
for_modules, is_resql_only=True)
else:
for_modules = []
if arguments['modules'] is not None:
for_modules = arguments['modules'].split(',')
TestsGeneratorRegistry.load_generators('pgadmin.%s' %
arguments['pkg'],
exclude_pkgs,
for_modules)
# Sort module list so that test suite executes the test cases sequentially
module_list = TestsGeneratorRegistry.registry.items()
module_list = sorted(module_list, key=lambda module_tuple: module_tuple[0])
return module_list
def add_arguments():
"""
This function parse the command line arguments(project's package name
e.g. browser) & add into parser
:return args: command line argument for pgadmin's package name
:rtype: argparse namespace
"""
parser = argparse.ArgumentParser(description='Test suite for pgAdmin4')
parser.add_argument(
'--pkg',
help='Executes the test cases of particular package and subpackages'
)
parser.add_argument(
'--exclude',
help='Skips execution of the test cases of particular package and '
'sub-packages'
)
parser.add_argument('--coverage', nargs='?', const=True, type=bool,
default=False, help='Enable code coverage feature')
parser.add_argument(
'--default_browser',
help='Executes the feature test in specific browser'
)
parser.add_argument(
'--modules',
help='Executes the feature test for specific modules in pkg'
)
parser.add_argument('--parallel', nargs='?', const=True,
type=bool, default=False,
help='Enable parallel Feature Tests')
arg = parser.parse_args()
return arg
def sig_handler(signo, frame):
global handle_cleanup
if handle_cleanup:
handle_cleanup()
def update_test_result(test_cases, test_result_dict):
"""
This function update the test result in appropriate test behaviours i.e
passed/failed/skipped.
:param test_cases: test cases
:type test_cases: dict
:param test_result_dict: test result to be stored
:type test_result_dict: dict
:return: None
"""
for test_case in test_cases:
test_class_name = test_case[0].__class__.__name__
test_scenario_name = getattr(
test_case[0], 'scenario_name', str(test_case[0])
)
if test_class_name in test_result_dict:
test_result_dict[test_class_name].append(
{test_scenario_name: test_case[1]})
else:
test_result_dict[test_class_name] = \
[{test_scenario_name: test_case[1]}]
def get_tests_result(test_suite):
"""This function returns the total ran and total failed test cases count"""
try:
total_ran = test_suite.testsRun
passed_cases_result = {}
failed_cases_result = {}
skipped_cases_result = {}
if total_ran:
passed = test_suite.passed
failures = test_suite.failures
errors = test_suite.errors
skipped = test_suite.skipped
if passed:
update_test_result(passed, passed_cases_result)
if failures:
update_test_result(failures, failed_cases_result)
if errors:
update_test_result(errors, failed_cases_result)
if skipped:
update_test_result(skipped, skipped_cases_result)
return total_ran, failed_cases_result, skipped_cases_result, \
passed_cases_result
except Exception:
traceback.print_exc(file=sys.stderr)
class StreamToLogger(object):
def __init__(self, logger, log_level=logging.INFO):
self.terminal = sys.stderr
self.logger = logger
self.log_level = log_level
self.linebuf = ''
def write(self, buf):
"""
This function writes the log in the logger file as well as on console
:param buf: log message
:type buf: str
:return: None
"""
self.terminal.write(buf)
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
def flush(self):
pass
def execute_test(test_module_list_passed, server_passed, driver_passed):
"""
Function executes actually test
:param test_module_list_passed:
:param server_passed:
:param driver_passed:
:return:
"""
try:
print("\n=============Running the test cases for '%s' ============="
% server_passed['name'], file=sys.stderr)
# Create test server
server_information = \
test_utils.create_parent_server_node(server_passed)
# Create test database with random number to avoid conflict in
# parallel execution on different platforms. This database will be
# used across all feature tests.
test_db_name = "acceptance_test_db" + \
str(random.randint(10000, 65535))
connection = test_utils.get_db_connection(
server_passed['db'],
server_passed['username'],
server_passed['db_password'],
server_passed['host'],
server_passed['port'],
server_passed['sslmode']
)
# Add the server version in server information
server_information['server_version'] = connection.server_version
server_information['type'] = server_passed['type']
# Drop the database if already exists.
test_utils.drop_database(connection, test_db_name)
# Create database
test_utils.create_database(server_passed, test_db_name)
# Configure preferences for the test cases
test_utils.configure_preferences(
default_binary_path=server_passed['default_binary_paths'])
# Get unit test suit
suite = get_suite(test_module_list_passed,
server_passed,
test_client,
server_information, test_db_name, driver_passed)
# Run unit test suit created
tests = unittest.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
# processing results
ran_tests, failed_cases, skipped_cases, passed_cases = \
get_tests_result(tests)
# This is required when some tests are running parallel
# & some sequential in case of parallel ui tests
if threading.current_thread().getName() == "sequential_tests":
try:
if test_result[server_passed['name']][0] is not None:
ran_tests = test_result[server_passed['name']][0] + \
ran_tests
failed_cases.update(test_result[server_passed['name']][1])
skipped_cases.update(test_result[server_passed['name']][2])
passed_cases.update(test_result[server_passed['name']][3])
test_result[server_passed['name']] = [ran_tests, failed_cases,
skipped_cases,
passed_cases]
except KeyError:
pass
# Add final results server wise in test_result dict
test_result[server_passed['name']] = [ran_tests, failed_cases,
skipped_cases, passed_cases]
# Set empty list for 'passed' parameter for each testRun.
# So that it will not append same test case name
# unittest.result.TestResult.passed = []
# Drop the testing database created initially
if connection:
test_utils.drop_database(connection, test_db_name)
connection.close()
# Delete test server
test_utils.delete_test_server(test_client)
except Exception as exc:
traceback.print_exc(file=sys.stderr)
print(str(exc))
print("Exception in {0}".format(threading.current_thread().ident))
finally:
# Delete web-driver instance
thread_name = "parallel_tests" + server_passed['name']
if threading.currentThread().getName() == thread_name:
driver_passed.quit()
time.sleep(20)
# Print info about completed tests
print(
"\n=============Completed the test cases for '%s'============="
% server_passed['name'], file=sys.stderr)
def run_parallel_tests(url_client, servers_details, parallel_tests_lists,
name_of_browser, version_of_browser, max_thread_count):
"""
Function used to run tests in parallel
:param url_client:
:param servers_details:
:param parallel_tests_lists:
:param name_of_browser:
:param version_of_browser:
:param max_thread_count:
"""
driver_object = None
try:
# Thread list
threads_list = []
# Create thread for each server
for ser in servers_details:
# Logic to add new threads
while True:
# If active thread count <= max_thread_count, add new thread
if threading.activeCount() <= max_thread_count:
# Get remote web-driver instance at server level
driver_object = \
test_utils.get_remote_webdriver(hub_url,
name_of_browser,
version_of_browser,
ser['name'])
# Launch client url in browser
test_utils.launch_url_in_browser(driver_object, url_client)
# Add name for thread
thread_name = "parallel_tests" + ser['name']
# Start thread
t = threading.Thread(target=execute_test, name=thread_name,
args=(parallel_tests_lists, ser,
driver_object))
threads_list.append(t)
t.start()
time.sleep(3)
break
# else sleep for 10 seconds
else:
time.sleep(10)
# Start threads in parallel
for t in threads_list:
t.join()
except Exception as exc:
# Print exception stack trace
traceback.print_exc(file=sys.stderr)
print(str(exc))
# Clean driver object created
if driver_object is not None:
driver_object.quit()
def run_sequential_tests(url_client, servers_details, sequential_tests_lists,
name_of_browser, version_of_browser):
"""
Function is used to execute tests that needs to be run in sequential
manner.
:param url_client:
:param servers_details:
:param sequential_tests_lists:
:param name_of_browser:
:param version_of_browser:
:return:
"""
driver_object = None
try:
# Get remote web-driver instance
driver_object = test_utils.get_remote_webdriver(hub_url,
name_of_browser,
version_of_browser,
"Sequential_Tests")
# Launch client url in browser
test_utils.launch_url_in_browser(driver_object, url_client)
# Add name for thread
thread_name = "sequential_tests"
# Start thread
for ser in servers_details:
t = threading.Thread(target=execute_test,
name=thread_name,
args=(sequential_tests_lists, ser,
driver_object))
t.start()
t.join()
except Exception as exc:
# Print exception stack trace
traceback.print_exc(file=sys.stderr)
print(str(exc))
finally:
# Clean driver object created
driver_object.quit()
def print_test_results():
print(
"\n==============================================================="
"=======",
file=sys.stderr
)
print("Test Result Summary", file=sys.stderr)
print(
"==================================================================="
"===\n", file=sys.stderr
)
test_result_json = {}
for server_res in test_result:
failed_cases = test_result[server_res][1]
skipped_cases = test_result[server_res][2]
passed_cases = test_result[server_res][3]
skipped_cases, skipped_cases_json = test_utils.get_scenario_name(
skipped_cases)
failed_cases, failed_cases_json = test_utils.get_scenario_name(
failed_cases)
total_failed = len(dict((key, value) for key, value in
failed_cases.items()).values())
total_skipped = len(dict((key, value) for key, value in
skipped_cases.items()).values())
total_passed_cases = int(
test_result[server_res][0]) - total_failed - total_skipped
if len(failed_cases) > 0:
global failure
failure = True
print(
"%s:\n\n\t%s test%s passed\n\t%s test%s failed%s%s"
"\n\t%s test%s skipped%s%s\n" %
(server_res, total_passed_cases,
(total_passed_cases != 1 and "s" or ""),
total_failed, (total_failed != 1 and "s" or ""),
(total_failed != 0 and ":\n\t\t" or ""),
"\n\t\t".join("{0} ({1})".format(key, ",\n\t\t\t\t\t".join(
map(str, value))) for key, value in failed_cases.items()),
total_skipped, (total_skipped != 1 and "s" or ""),
(total_skipped != 0 and ":\n\t\t" or ""),
"\n\t\t".join("{0} ({1})".format(key, ",\n\t\t\t\t\t".join(
map(str, value))) for key, value in skipped_cases.items())),
file=sys.stderr)
temp_dict_for_server = {
server_res: {
"tests_passed": [total_passed_cases, passed_cases],
"tests_failed": [total_failed, failed_cases_json],
"tests_skipped": [total_skipped, skipped_cases_json]
}
}
test_result_json.update(temp_dict_for_server)
# Dump test result into json file
json_file_path = CURRENT_PATH + "/test_result.json"
with open(json_file_path, 'w') as outfile:
json.dump(test_result_json, outfile, indent=2)
print(
"==================================================================="
"===\n",
file=sys.stderr
)
if __name__ == '__main__':
# Failure detected?
failure = False
test_result = dict()
cov = None
# Set signal handler for cleanup
signal_list = dir(signal)
required_signal_list = ['SIGTERM', 'SIGABRT', 'SIGQUIT', 'SIGINT']
# Get the OS wise supported signals
supported_signal_list = [sig for sig in required_signal_list if
sig in signal_list]
for sig in supported_signal_list:
signal.signal(getattr(signal, sig), sig_handler)
# Set basic logging configuration for log file
fh = logging.FileHandler(CURRENT_PATH + '/' +
'regression.log', 'w', 'utf-8')
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('[%(thread)d] ' +
config.FILE_LOG_FORMAT))
logger = logging.getLogger()
logger.addHandler(fh)
# Create logger to write log in the logger file as well as on console
stderr_logger = logging.getLogger('STDERR')
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
args = vars(add_arguments())
# Get test module list
try:
test_module_list = get_test_modules(args)
except Exception as e:
print(str(e))
sys.exit(1)
# Login the test client
test_utils.login_tester_account(test_client)
servers_info = test_utils.get_config_data()
node_name = "all"
if args['pkg'] is not None:
node_name = args['pkg'].split('.')[-1]
# Start coverage
if test_utils.is_coverage_enabled(args):
cov = coverage.Coverage(config_file=COVERAGE_CONFIG_FILE)
cov.start()
# Check if feature tests included & parallel tests switch passed
if test_utils.is_feature_test_included(args) and \
test_utils.is_parallel_ui_tests(args):
# Get selenium config dict
selenoid_config = test_setup.config_data['selenoid_config']
# Set DEFAULT_SERVER value
default_server = selenoid_config['pgAdmin_default_server']
os.environ["PGADMIN_CONFIG_DEFAULT_SERVER"] = str(default_server)
config.DEFAULT_SERVER = str(default_server)
# Get hub url
hub_url = selenoid_config['selenoid_url']
# Get selenium grid status & list of available browser out passed
selenium_grid_status, list_of_browsers \
= test_utils.get_selenium_grid_status_and_browser_list(hub_url)
# Execute tests if selenium-grid is up
if selenium_grid_status and len(list_of_browsers) > 0:
app_starter_local = None
# run across browsers
for browser_info in list_of_browsers:
try:
# browser info
browser_name, browser_version = \
test_utils.get_browser_details(browser_info, hub_url)
# tests lists can be executed in parallel & sequentially
parallel_tests, sequential_tests = \
test_utils.get_parallel_sequential_module_list(
test_module_list)
# Print test summary
test_utils.print_test_summary(test_module_list,
parallel_tests,
sequential_tests,
browser_name,
browser_version)
# Create app form source code
app_starter_local = AppStarter(None, config)
client_url = app_starter_local.start_app()
# Running Parallel tests
if len(parallel_tests) > 0:
parallel_sessions = int(selenoid_config[
'max_parallel_sessions'])
run_parallel_tests(client_url, servers_info,
parallel_tests, browser_name,
browser_version, parallel_sessions)
# Wait till all threads started in parallel are finished
while True:
try:
if threading.activeCount() <= 1:
break
else:
time.sleep(10)
except Exception as e:
traceback.print_exc(file=sys.stderr)
print(str(e))
# Sequential Tests
if len(sequential_tests) > 0:
run_sequential_tests(client_url, servers_info,
sequential_tests, browser_name,
browser_version)
# Clean up environment
if app_starter_local:
app_starter_local.stop_app()
except SystemExit:
if app_starter_local:
app_starter_local.stop_app()
if handle_cleanup:
handle_cleanup()
# Pause before printing result in order not to mix output
time.sleep(5)
# Print note for completion of execution in a browser.
print(
"\n============= Test execution with {0} is "
"completed.=============".format(browser_name),
file=sys.stderr)
print_test_results()
del os.environ["PGADMIN_CONFIG_DEFAULT_SERVER"]
else:
try:
for server in servers_info:
thread = threading.Thread(target=execute_test, args=(
test_module_list, server, driver))
thread.start()
thread.join()
except SystemExit:
if handle_cleanup:
handle_cleanup()
print_test_results()
# Stop code coverage
if test_utils.is_coverage_enabled(args):
cov.stop()
cov.save()
# Print coverage only if coverage args given in command line
if test_utils.is_coverage_enabled(args):
test_utils.print_and_store_coverage_report(cov)
print("Please check output in file: %s/regression.log\n" % CURRENT_PATH)
# Unset environment variable
del os.environ["PGADMIN_TESTING_MODE"]
if failure:
sys.exit(1)
else:
sys.exit(0)