Update regression tests to resolve issues where database/connections were getting mixed up.

This commit is contained in:
Navnath Gadakh 2016-09-22 12:58:38 +01:00 committed by Dave Page
parent f117685d77
commit 1cb5a7c7ca
9 changed files with 201 additions and 152 deletions

View File

@ -30,21 +30,21 @@ class DatabaseAddTestCase(BaseTestGenerator):
def runTest(self):
""" This function will add database under 1st server of tree node. """
self.db_name = ''
server_id = test_server_dict["server"][0]["server_id"]
server_response = server_utils.connect_server(self, server_id)
self.server_id = test_server_dict["server"][0]["server_id"]
server_response = server_utils.connect_server(self, self.server_id)
if server_response["info"] == "Server connected.":
db_owner = server_response['data']['user']['name']
self.data = database_utils.get_db_data(db_owner)
self.db_name = self.data['name']
response = self.tester.post(self.url + str(utils.SERVER_GROUP) +
"/" + str(server_id) + "/",
"/" + str(self.server_id) + "/",
data=json.dumps(self.data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
db_id = response_data['node']['_id']
db_dict = {"db_id": db_id, "db_name": self.db_name}
utils.write_node_info(int(server_id), "did", db_dict)
utils.write_node_info(int(self.server_id), "did", db_dict)
else:
raise Exception("Error while connecting server to add the"
" database.")
@ -59,3 +59,4 @@ class DatabaseAddTestCase(BaseTestGenerator):
self.server['host'],
self.server['port'])
utils.drop_database(connection, self.db_name)

View File

@ -10,13 +10,12 @@
import json
import uuid
from regression.test_setup import advanced_config_data
from pgadmin.browser.server_groups.servers.tests import utils as server_utils
from regression import test_utils as utils
DATABASE_URL = '/browser/database/obj/'
DATABASE_CONNECT_URL = 'browser/database/connect/'
DATABASE_CONNECT_URL = '/browser/database/connect/'
def get_db_data(db_owner):

View File

@ -36,4 +36,4 @@ class ServersAddTestCase(BaseTestGenerator):
def tearDown(self):
"""This function delete the server from SQLite """
utils.delete_server(self.server_id)
utils.delete_server(self.tester, self.server_id)

View File

@ -33,4 +33,4 @@ class ServerDeleteTestCase(BaseTestGenerator):
def tearDown(self):
"""This function delete the server from SQLite """
utils.delete_server(self.server_id)
utils.delete_server(self.tester, self.server_id)

View File

@ -37,4 +37,4 @@ class ServerUpdateTestCase(BaseTestGenerator):
def tearDown(self):
"""This function delete the server from SQLite"""
utils.delete_server(self.server_id)
utils.delete_server(self.tester, self.server_id)

View File

@ -9,14 +9,9 @@
from __future__ import print_function
import sys
import json
import sqlite3
import config
from regression import node_info_dict
from regression import test_utils as utils
from regression.test_setup import config_data
SERVER_URL = '/browser/server/obj/'
SERVER_CONNECT_URL = '/browser/server/connect/'

View File

@ -19,6 +19,9 @@ import atexit
import unittest
import logging
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
from testscenarios.scenarios import generate_scenarios
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
@ -33,34 +36,30 @@ if sys.path[0] != root:
from pgadmin import create_app
import config
import test_setup
import regression
# Execute setup.py if test SQLite database doesn't exist.
# Delete SQLite db file if exists
if os.path.isfile(config.TEST_SQLITE_PATH):
print("The configuration database already existed at '%s'. "
"Please remove the database and again run the test suite." %
config.TEST_SQLITE_PATH)
sys.exit(1)
else:
config.TESTING_MODE = True
pgadmin_credentials = test_setup.config_data
os.remove(config.TEST_SQLITE_PATH)
# Set environment variables for email and password
os.environ['PGADMIN_SETUP_EMAIL'] = ''
os.environ['PGADMIN_SETUP_PASSWORD'] = ''
if pgadmin_credentials:
if 'pgAdmin4_login_credentials' in pgadmin_credentials:
if all(item in pgadmin_credentials['pgAdmin4_login_credentials']
for item in ['login_username', 'login_password']):
pgadmin_credentials = pgadmin_credentials[
'pgAdmin4_login_credentials']
os.environ['PGADMIN_SETUP_EMAIL'] = pgadmin_credentials[
'login_username']
os.environ['PGADMIN_SETUP_PASSWORD'] = pgadmin_credentials[
'login_password']
config.TESTING_MODE = True
pgadmin_credentials = test_setup.config_data
# Execute the setup file
exec (open("setup.py").read())
# Set environment variables for email and password
os.environ['PGADMIN_SETUP_EMAIL'] = ''
os.environ['PGADMIN_SETUP_PASSWORD'] = ''
if pgadmin_credentials:
if 'pgAdmin4_login_credentials' in pgadmin_credentials:
if all(item in pgadmin_credentials['pgAdmin4_login_credentials']
for item in ['login_username', 'login_password']):
pgadmin_credentials = pgadmin_credentials[
'pgAdmin4_login_credentials']
os.environ['PGADMIN_SETUP_EMAIL'] = pgadmin_credentials[
'login_username']
os.environ['PGADMIN_SETUP_PASSWORD'] = pgadmin_credentials[
'login_password']
# Execute the setup file
exec (open("setup.py").read())
# Get the config database schema version. We store this in pgadmin.model
# as it turns out that putting it in the config files isn't a great idea
@ -80,9 +79,43 @@ config.CONSOLE_LOG_LEVEL = WARNING
app = create_app()
app.config['WTF_CSRF_ENABLED'] = False
test_client = app.test_client()
drop_objects = test_utils.get_cleanup_handler(test_client)
def get_suite(arguments, server, test_app_client):
def get_suite(module_list, test_server, test_app_client):
"""
This function add the tests to test suite and return modified test suite
variable.
:param module_list: test module list
:type module_list: list
:param test_server: server details
:type test_server: dict
:param test_app_client: test client
:type test_app_client: pgadmin app object
:return pgadmin_suite: test suite with test cases
:rtype: TestSuite
"""
modules = []
pgadmin_suite = unittest.TestSuite()
# Get the each test module and add into list
for key, klass in module_list:
gen = klass
modules.append(gen)
# Set the test client to each module & generate the scenarios
for module in modules:
obj = module()
obj.setApp(app)
obj.setTestClient(test_app_client)
obj.setTestServer(test_server)
scenario = generate_scenarios(obj)
pgadmin_suite.addTests(scenario)
return pgadmin_suite
def get_test_modules(arguments):
"""
This function loads the all modules in the tests directory into testing
environment.
@ -90,19 +123,12 @@ def get_suite(arguments, server, test_app_client):
:param arguments: this is command line arguments for module name to
which test suite will run
:type arguments: str
:param server: server details
:type server: dict
:param test_app_client: test client
:type test_app_client: pgadmin app object
:return pgadmin_suite: test suite with test cases
:rtype: TestSuite
:return module list: test module list
:rtype: list
"""
from pgadmin.utils.route import TestsGeneratorRegistry
modules = []
pgadmin_suite = unittest.TestSuite()
# Load the test modules which are in given package(i.e. in arguments.pkg)
if arguments['pkg'] is None or arguments['pkg'] == "all":
TestsGeneratorRegistry.load_generators('pgadmin')
@ -114,21 +140,7 @@ def get_suite(arguments, server, test_app_client):
module_list = TestsGeneratorRegistry.registry.items()
module_list = sorted(module_list, key=lambda module_tuple: module_tuple[0])
# Get the each test module and add into list
for key, klass in module_list:
gen = klass
modules.append(gen)
# Set the test client to each module & generate the scenarios
for module in modules:
obj = module()
obj.setApp(app)
obj.setTestClient(test_app_client)
obj.setTestServer(server)
scenario = generate_scenarios(obj)
pgadmin_suite.addTests(scenario)
return pgadmin_suite
return module_list
def add_arguments():
@ -149,27 +161,41 @@ def add_arguments():
def sig_handler(signo, frame):
test_utils.drop_objects()
drop_objects()
def get_tests_result(tests):
def get_tests_result(test_suite):
"""This function returns the total ran and total failed test cases count"""
total_ran = tests.testsRun
failed_cases_result = []
if total_ran:
if tests.failures:
for failed_case in tests.failures:
class_name = str(failed_case[0]).split('.')[-1].split()[0].\
strip(')')
failed_cases_result.append(class_name)
if tests.errors:
for error_case in tests.errors:
class_name = str(error_case[0]).split('.')[-1].split()[0].\
strip(')')
if class_name not in failed_cases_result:
try:
total_ran = test_suite.testsRun
failed_cases_result = []
skipped_cases_result = []
if total_ran:
if test_suite.failures:
for failed_case in test_suite.failures:
class_name = str(
failed_case[0]).split('.')[-1].split()[0].strip(')')
failed_cases_result.append(class_name)
return total_ran, failed_cases_result
if test_suite.errors:
for error_case in test_suite.errors:
class_name = str(
error_case[0]).split('.')[-1].split()[0].strip(')')
if class_name not in failed_cases_result:
failed_cases_result.append(class_name)
# TODO: Code remaining to count the skipped test cases
# if test_suite.skipped:
# for skip_test in test_suite.skipped:
# class_name = str(
# skip_test[0]).split('.')[-1].split()[0].strip(')')
# if class_name not in failed_cases_result:
# skipped_cases_result.append(class_name)
# print(class_name)
return total_ran, failed_cases_result, skipped_cases_result
except Exception as exception:
exception = "Exception: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception)
logger.exception(exception)
class StreamToLogger(object):
@ -199,7 +225,7 @@ class StreamToLogger(object):
if __name__ == '__main__':
test_result = dict()
# Register cleanup function to cleanup on exit
atexit.register(test_utils.drop_objects)
atexit.register(drop_objects)
# Set signal handler for cleanup
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGABRT, sig_handler)
@ -217,46 +243,48 @@ if __name__ == '__main__':
# Create logger to write log in the logger file as well as on console
stderr_logger = logging.getLogger('STDERR')
sys.stderr = StreamToLogger(stderr_logger, logging.ERROR)
args = vars(add_arguments())
# Get test module list
test_module_list = get_test_modules(args)
# Login the test client
test_utils.login_tester_account(test_client)
servers_info = test_utils.get_config_data()
try:
for server in servers_info:
print("\n=============Running the test cases for '%s'============="
% server['name'], file=sys.stderr)
# Create test server
test_utils.create_test_server(server)
# Login the test client
test_utils.login_tester_account(test_client)
suite = get_suite(args, server, test_client)
suite = get_suite(test_module_list, server, test_client)
tests = unittest.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
ran_tests, failed_cases = get_tests_result(tests)
test_result[server['name']] = [ran_tests, failed_cases]
# Logout the test client
test_utils.logout_tester_account(test_client)
test_utils.delete_test_server()
ran_tests, failed_cases, skipped_cases = \
get_tests_result(tests)
test_result[server['name']] = [ran_tests, failed_cases,
skipped_cases]
# Delete test server
test_utils.delete_test_server(test_client)
except SystemExit:
test_utils.drop_objects()
drop_objects()
print("\nTest Result Summary", file=sys.stderr)
print("============================", file=sys.stderr)
for server_res in test_result:
failed_cases = "\n\t".join(test_result[server_res][1])
total_failed = len(test_result[server_res][1])
# TODO : code remaining to handle '-1' condition
total_passed = int(test_result[server_res][0]) - total_failed
print("%s: %s test%s passed, %s test%s failed %s%s" %
(server_res, total_passed, (total_passed != 1 and "s" or ""),
total_failed, (total_failed != 1 and "s" or ""),
(total_failed != 0 and ":\n\t" or ""), failed_cases), file=sys.stderr)
(total_failed != 0 and ":\n\t" or ""), failed_cases),
file=sys.stderr)
print("============================", file=sys.stderr)
print("\nPlease check output in file: %s/regression.log " % CURRENT_PATH)

View File

@ -13,17 +13,9 @@ import os
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
try:
with open(CURRENT_PATH + '/test_config.json') as data_file:
config_data = json.load(data_file)
except:
except Exception as exception:
with open(CURRENT_PATH + '/test_config.json.in') as data_file:
config_data = json.load(data_file)
try:
with open(CURRENT_PATH + '/test_advanced_config.json') as data_file:
advanced_config_data = json.load(data_file)
except:
with open(CURRENT_PATH + '/test_advanced_config.json.in') as data_file:
advanced_config_data = json.load(data_file)

View File

@ -14,15 +14,12 @@ import uuid
import psycopg2
import sqlite3
import logging
import config
import test_setup
import regression
from functools import partial
SERVER_GROUP = test_setup.config_data['server_group']
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
@ -36,10 +33,6 @@ def get_db_connection(db, username, password, host, port):
return connection
def get_node_info_dict():
return regression.node_info_dict
def login_tester_account(tester):
"""
This function login the test client using env variables email and password
@ -47,8 +40,8 @@ def login_tester_account(tester):
:type tester: flask test client object
:return: None
"""
if os.environ['PGADMIN_SETUP_EMAIL'] and os.environ[
'PGADMIN_SETUP_PASSWORD']:
if os.environ['PGADMIN_SETUP_EMAIL'] and\
os.environ['PGADMIN_SETUP_PASSWORD']:
email = os.environ['PGADMIN_SETUP_EMAIL']
password = os.environ['PGADMIN_SETUP_PASSWORD']
tester.post('/login', data=dict(email=email, password=password),
@ -56,7 +49,7 @@ def login_tester_account(tester):
else:
print("Unable to login test client, email and password not found.",
file=sys.stderr)
drop_objects()
_drop_objects(tester)
sys.exit(1)
@ -151,19 +144,25 @@ def create_database(server, db_name):
def drop_database(connection, database_name):
"""This function used to drop the database"""
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT * FROM pg_database db WHERE db.datname='%s'"
% database_name)
if pg_cursor.fetchall():
# Release pid if any process using database
pg_cursor.execute("select pg_terminate_backend(pid) from"
" pg_stat_activity where datname='%s'" %
database_name)
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor.execute('''DROP DATABASE "%s"''' % database_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
try:
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT * FROM pg_database db WHERE db.datname='%s'"
% database_name)
if pg_cursor.fetchall():
# Release pid if any process using database
pg_cursor.execute("select pg_terminate_backend(pid) from"
" pg_stat_activity where datname='%s'" %
database_name)
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor.execute('''DROP DATABASE "%s"''' % database_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
connection.close()
except Exception as exception:
exception = "%s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
def create_server(server):
@ -185,19 +184,37 @@ def create_server(server):
raise Exception("Error while creating server. %s" % exception)
def delete_server(sid):
def delete_server(tester, sid):
"""This function used to delete server from SQLite"""
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
url = '/browser/server/obj/' + str(SERVER_GROUP) + "/"
# Call API to delete the server
response = tester.delete(url + str(sid))
except Exception as exception:
exception = "%s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
def delete_server_from_sqlite(sid):
"""This function used to delete server from SQLite"""
try:
con = sqlite3.connect(config.SQLITE_PATH)
cur = con.cursor()
server_objects = cur.execute('SELECT * FROM server WHERE id=%s' % sid)
servers_count = len(server_objects.fetchall())
ss = server_objects.fetchall()
# for i in ss:
# print(">>>>>>>>>>>", i)
servers_count = len(ss)
# print(">>>>>>>", sid)
if servers_count:
cur.execute('DELETE FROM server WHERE id=%s' % sid)
conn.commit()
conn.close()
except Exception as err:
raise Exception("Error while deleting server %s" % err)
con.commit()
con.close()
except Exception as exception:
exception = "%s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
def create_tablespace(server, test_tablespace_name):
@ -246,7 +263,6 @@ def delete_tablespace(connection, test_tablespace_name):
exception = "%s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
raise Exception(exception)
def create_test_server(server_info):
@ -277,7 +293,7 @@ def create_test_server(server_info):
"db_name": test_db_name})
def delete_test_server():
def delete_test_server(tester):
test_server_dict = regression.test_server_dict
test_servers = test_server_dict["server"]
test_databases = test_server_dict["database"]
@ -295,14 +311,12 @@ def delete_test_server():
database_name = database["db_name"]
# Drop database
drop_database(connection, database_name)
# Delete server
delete_server(srv_id)
delete_server(tester, srv_id)
except Exception as exception:
exception = "Exception: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception)
logger.exception(exception)
print(exception, file=sys.stderr)
# Clear test_server_dict
for item in regression.test_server_dict:
@ -310,11 +324,12 @@ def delete_test_server():
def remove_db_file():
"""This function use to remove SQLite DB file"""
if os.path.isfile(config.SQLITE_PATH):
os.remove(config.SQLITE_PATH)
def drop_objects():
def _drop_objects(tester):
"""This function use to cleanup the created the objects(servers, databases,
schemas etc) during the test suite run"""
try:
@ -329,7 +344,6 @@ def drop_objects():
host = server_info[1]
db_port = server_info[2]
server_id = server_info[5]
config_servers = test_setup.config_data['server_credentials']
db_password = ''
# Get the db password from config file for appropriate server
@ -350,11 +364,15 @@ def drop_objects():
databases = pg_cursor.fetchall()
if databases:
for db in databases:
connection = get_db_connection(server_info[3],
server_info[4],
db_password,
server_info[1],
server_info[2])
# Do not drop the default databases
if db[0] not in ["postgres", "template1",
"template0"]:
drop_database(connection, db[0])
connection.close()
# Delete tablespace
connection = get_db_connection(server_info[3],
@ -374,14 +392,30 @@ def drop_objects():
# Delete tablespace
delete_tablespace(connection, tablespace_name)
for server_info in all_servers:
server_id = server_info[5]
# Delete server
delete_server(server_id)
try:
delete_server(tester, server_id)
except Exception as exception:
exception = "Exception while deleting server: %s:" \
" line:%s %s" %\
(file_name, sys.exc_traceback.tb_lineno,
exception)
print(exception, file=sys.stderr)
continue
conn.close()
# Remove SQLite db file
remove_db_file()
except Exception as exception:
remove_db_file()
exception = "Exception: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception)
logger.exception(exception)
print(exception, file=sys.stderr)
finally:
# Logout the test client
logout_tester_account(tester)
# Remove SQLite db file
remove_db_file()
def get_cleanup_handler(tester):
"""This function use to bind variable to drop_objects function"""
return partial(_drop_objects, tester)