Enhanced summary output for the test suite.

This commit is contained in:
Navnath Gadakh
2016-09-19 16:49:06 +01:00
committed by Dave Page
parent 881d2a60a4
commit c84fd83595
12 changed files with 425 additions and 278 deletions

View File

@@ -29,25 +29,29 @@ class DatabaseAddTestCase(BaseTestGenerator):
def runTest(self):
""" This function will add database under 1st server of tree node. """
self.db_name = ''
server_id = test_server_dict["server"][0]["server_id"]
server_utils.connect_server(self, server_id)
data = database_utils.get_db_data()
self.db_name = data['name']
response = self.tester.post(self.url + str(utils.SERVER_GROUP) +
"/" + str(server_id) + "/",
data=json.dumps(data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
db_id = response_data['node']['_id']
db_dict = {"db_id": db_id, "db_name": self.db_name}
utils.write_node_info(int(server_id), "did", db_dict)
server_response = server_utils.connect_server(self, server_id)
if server_response["info"] == "Server connected.":
db_owner = server_response['data']['user']['name']
self.data = database_utils.get_db_data(db_owner)
self.db_name = self.data['name']
response = self.tester.post(self.url + str(utils.SERVER_GROUP) +
"/" + str(server_id) + "/",
data=json.dumps(self.data),
content_type='html/json')
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
db_id = response_data['node']['_id']
db_dict = {"db_id": db_id, "db_name": self.db_name}
utils.write_node_info(int(server_id), "did", db_dict)
else:
raise Exception("Error while connecting server to add the"
" database.")
def tearDown(self):
"""
This function delete the database from server added in SQLite and
clears the node_info_dict
This function delete the database from server added in SQLite.
"""
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
@@ -55,4 +59,3 @@ class DatabaseAddTestCase(BaseTestGenerator):
self.server['host'],
self.server['port'])
utils.drop_database(connection, self.db_name)
utils.clear_node_info_dict()

View File

@@ -6,6 +6,8 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import uuid
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression import test_server_dict
@@ -19,9 +21,9 @@ class DatabaseDeleteTestCase(BaseTestGenerator):
('Check Databases Node URL', dict(url='/browser/database/obj/'))
]
@classmethod
def setUpClass(cls):
cls.db_id = utils.create_database(cls.server, "test_db_delete")
def setUp(self):
self.db_name = "db_delete_%s" % str(uuid.uuid4())[1:4],
self.db_id = utils.create_database(self.server, self.db_name)
def runTest(self):
""" This function will delete the database."""
@@ -38,6 +40,11 @@ class DatabaseDeleteTestCase(BaseTestGenerator):
raise Exception("Could not connect to server to delete the "
"database.")
@classmethod
def tearDownClass(cls):
pass
def tearDown(self):
"""This function drop the added database"""
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
utils.drop_database(connection, self.db_name)

View File

@@ -19,30 +19,30 @@ class DatabasesGetTestCase(BaseTestGenerator):
"""
scenarios = [
# Fetching default URL for database node.
('Check Dat abases Node URL', dict(url='/browser/database/obj/'))
('Check Databases Node URL', dict(url='/browser/database/obj/'))
]
def runTest(self):
""" This function will fetch added database. """
server_data = test_server_dict["database"][0]
server_id = server_data["server_id"]
db_id = server_data['db_id']
self.server_id = server_data["server_id"]
self.db_id = server_data['db_id']
db_con = database_utils.verify_database(self,
utils.SERVER_GROUP,
server_id,
db_id)
if db_con["info"] == "Database connected.":
try:
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' + str(
server_id) + '/' +
str(db_id), follow_redirects=True)
self.assertEquals(response.status_code, 200)
except Exception as exception:
raise Exception("Error while getting database. %s" % exception)
finally:
# Disconnect database to delete it
database_utils.disconnect_database(self, server_id, db_id)
else:
raise Exception("Could not connect to database.")
self.server_id,
self.db_id)
try:
if db_con["info"] == "Database connected.":
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id), follow_redirects=True)
self.assertEquals(response.status_code, 200)
else:
raise Exception("Could not connect to database.")
except Exception as exception:
raise Exception("Error while getting database. %s" % exception)
finally:
# Disconnect database to delete it
database_utils.disconnect_database(self, self.server_id,
self.db_id)

View File

@@ -8,11 +8,11 @@
# ##################################################################
import json
import uuid
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from regression import test_server_dict
from regression.test_setup import advanced_config_data
from . import utils as database_utils
@@ -23,10 +23,9 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
('Check Databases Node', dict(url='/browser/database/obj/'))
]
@classmethod
def setUpClass(cls):
cls.db_name = "test_db_put"
cls.db_id = utils.create_database(cls.server, cls.db_name)
def setUp(self):
self.db_name = "test_db_put_%s" % str(uuid.uuid4())[1:8],
self.db_id = utils.create_database(self.server, self.db_name)
def runTest(self):
""" This function will update the comments field of database."""
@@ -39,10 +38,11 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
if db_con["info"] == "Database connected.":
try:
data = {
"comments": advanced_config_data["db_update_data"]["comment"],
"comments": "This is db update comment",
"id": db_id
}
response = self.tester.put(self.url + str(utils.SERVER_GROUP) + '/' + str(
response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' + str(
server_id) + '/' +
str(db_id), data=json.dumps(data), follow_redirects=True)
self.assertEquals(response.status_code, 200)
@@ -55,16 +55,13 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
else:
raise Exception("Error while updating database details.")
@classmethod
def tearDownClass(cls):
def tearDown(self):
"""
This function delete the database from server added in SQLite and
clears the node_info_dict
This function delete the database from server added in SQLite.
"""
connection = utils.get_db_connection(cls.server['db'],
cls.server['username'],
cls.server['db_password'],
cls.server['host'],
cls.server['port'])
utils.drop_database(connection, cls.db_name)
utils.clear_node_info_dict()
connection = utils.get_db_connection(self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port'])
utils.drop_database(connection, self.db_name)

View File

@@ -19,28 +19,103 @@ DATABASE_URL = '/browser/database/obj/'
DATABASE_CONNECT_URL = 'browser/database/connect/'
def get_db_data():
"""This function returns the database details from json file"""
data = None
if advanced_config_data['add_database_data'] is not None:
adv_config_data = advanced_config_data['add_database_data']
data = {
"datacl": adv_config_data['privileges_acl'],
"datconnlimit": adv_config_data['conn_limit'],
"datowner": adv_config_data['owner'],
"deffuncacl": adv_config_data['fun_acl'],
"defseqacl": adv_config_data['seq_acl'],
"deftblacl": adv_config_data['tbl_acl'],
"deftypeacl": adv_config_data['type_acl'],
"encoding": adv_config_data['encoding'],
"name": str(uuid.uuid4())[1:8],
"privileges": adv_config_data['privileges'],
"securities": adv_config_data['securities'],
"variables": adv_config_data['variables']
}
def get_db_data(db_owner):
"""
This function returns the database details in dict format
"""
data = {
"datconnlimit": -1,
"datowner": db_owner,
"deffuncacl": [
{
"grantee": db_owner,
"grantor": db_owner,
"privileges": [
{
"privilege_type": "X",
"privilege": True,
"with_grant": False
}
]
}
],
"defseqacl": [
{
"grantee": db_owner,
"grantor": db_owner,
"privileges": [
{
"privilege_type": "r",
"privilege": True,
"with_grant": False
},
{
"privilege_type": "w",
"privilege": True,
"with_grant": False
},
{
"privilege_type": "U",
"privilege": True,
"with_grant": False
}
]
}
],
"deftblacl": [
{
"grantee": db_owner,
"grantor": db_owner,
"privileges": [
{
"privilege_type": "a",
"privilege": True,
"with_grant": True
},
{
"privilege_type": "r",
"privilege": True,
"with_grant": False
}
]
}
],
"deftypeacl": [
{
"grantee": db_owner,
"grantor": db_owner,
"privileges": [
{
"privilege_type": "U",
"privilege": True,
"with_grant": False
}
]
}
],
"encoding": "UTF8",
"name": "db_add_%s" % str(uuid.uuid4())[1:4],
"privileges": [],
"securities": [],
"variables": []
}
return data
def create_database(connection, db_name):
"""This function used to create database"""
try:
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute("CREATE DATABASE %s" % db_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
return pg_cursor
except Exception as exception:
raise Exception("Error while creating database. %s" % exception)
def verify_database(self, server_group, server_id, db_id):
"""
This function verifies that database is exists and whether it connect

View File

@@ -11,7 +11,6 @@ import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
class ServersAddTestCase(BaseTestGenerator):
@@ -22,8 +21,7 @@ class ServersAddTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
@classmethod
def setUpClass(cls):
def setUp(self):
pass
def runTest(self):
@@ -33,14 +31,9 @@ class ServersAddTestCase(BaseTestGenerator):
content_type='html/json')
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
server_id = response_data['node']['_id']
utils.write_node_info(int(server_id), "sid", self.server)
self.server_id = response_data['node']['_id']
utils.write_node_info(int(self.server_id), "sid", self.server)
@classmethod
def tearDownClass(cls):
"""
This function delete the server from SQLite & clears the node_info_dict
"""
server_id = server_utils.get_server_id()
utils.delete_server(server_id)
utils.clear_node_info_dict()
def tearDown(self):
"""This function delete the server from SQLite """
utils.delete_server(self.server_id)

View File

@@ -6,12 +6,8 @@
# This software is released under the PostgreSQL Licence
#
# ##################################################################
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
class ServerDeleteTestCase(BaseTestGenerator):
@@ -22,29 +18,19 @@ class ServerDeleteTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
@classmethod
def setUpClass(cls):
def setUp(self):
"""This function add the server to test the DELETE API"""
server_utils.add_server(cls.server)
self.server_id = utils.create_server(self.server)
def runTest(self):
"""This function deletes the added server"""
all_id = utils.get_node_info_dict()
servers_info = all_id["sid"]
url = self.url + str(utils.SERVER_GROUP) + "/"
if len(servers_info) == 0:
if not self.server_id:
raise Exception("No server to delete!!!")
# Call API to delete the servers
server_id = list(servers_info[0].keys())[0]
response = self.tester.delete(url + str(server_id))
response = self.tester.delete(url + str(self.server_id))
self.assertEquals(response.status_code, 200)
response_data = json.loads(response.data.decode('utf-8'))
self.assertEquals(response_data['success'], 1)
@classmethod
def tearDownClass(cls):
"""This function calls the clear_node_info_dict() function to clears
the node_info_dict"""
utils.clear_node_info_dict()
def tearDown(self):
"""This function delete the server from SQLite """
utils.delete_server(self.server_id)

View File

@@ -10,7 +10,6 @@
import json
from pgadmin.utils.route import BaseTestGenerator
from regression import test_utils as utils
from . import utils as server_utils
class ServerUpdateTestCase(BaseTestGenerator):
@@ -21,37 +20,21 @@ class ServerUpdateTestCase(BaseTestGenerator):
('Default Server Node url', dict(url='/browser/server/obj/'))
]
@classmethod
def setUpClass(cls):
def setUp(self):
"""This function add the server to test the PUT API"""
server_utils.add_server(cls.server)
self.server_id = utils.create_server(self.server)
def runTest(self):
"""This function update the server details"""
all_id = utils.get_node_info_dict()
servers_info = all_id["sid"]
if len(servers_info) == 0:
if not self.server_id:
raise Exception("No server to update.")
server_id = list(servers_info[0].keys())[0]
data = {
"comment":
server_utils.config_data['server_update_data'][0][
'comment'],
"id": server_id
}
data = {"comment": self.server['comment'], "id": self.server_id}
put_response = self.tester.put(
self.url + str(utils.SERVER_GROUP) + '/' +
str(server_id), data=json.dumps(data),
str(self.server_id), data=json.dumps(data),
content_type='html/json')
self.assertEquals(put_response.status_code, 200)
@classmethod
def tearDownClass(cls):
"""
This function delete the server from SQLite & clears the node_info_dict
"""
server_id = server_utils.get_server_id()
utils.delete_server(server_id)
utils.clear_node_info_dict()
def tearDown(self):
"""This function delete the server from SQLite"""
utils.delete_server(self.server_id)

View File

@@ -19,17 +19,7 @@ from regression import test_utils as utils
from regression.test_setup import config_data
SERVER_URL = '/browser/server/obj/'
SERVER_CONNECT_URL = 'browser/server/connect/'
def get_server_id():
"""This function returns the server id from node_info_dict"""
server_id = 0
if "sid" in node_info_dict:
if node_info_dict['sid']:
server_id = list(node_info_dict['sid'][0].keys())[0]
return server_id
SERVER_CONNECT_URL = '/browser/server/connect/'
def connect_server(self, server_id):
@@ -37,8 +27,8 @@ def connect_server(self, server_id):
This function used to connect added server
:param self: class object of server's test class
:type self: class
:param server_id: flag for db add test case
:type server_id: bool
:param server_id: server id
:type server_id: str
"""
response = self.tester.post(SERVER_CONNECT_URL + str(utils.SERVER_GROUP) +
'/' + str(server_id),
@@ -47,24 +37,3 @@ def connect_server(self, server_id):
assert response.status_code == 200
response_data = json.loads(response.data.decode('utf-8'))
return response_data
def add_server(server):
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
server_details = (
1, utils.SERVER_GROUP, server['name'], server['host'],
server['port'], server['db'], server['username'],
server['role'], server['sslmode'],
server['comment'])
cur.execute(
'INSERT INTO server (user_id, servergroup_id, name, host, '
'port, maintenance_db, username, role, ssl_mode,'
' comment) VALUES (?,?,?,?,?,?,?,?,?,?)', server_details)
server_id = cur.lastrowid
# Add server info to node_info_dict
utils.write_node_info(int(server_id), "sid", server)
conn.commit()
except Exception as err:
raise Exception(err)

View File

@@ -28,5 +28,7 @@ node_info_dict = {
global test_server_dict
test_server_dict = {
"server": [],
"database": []
"database": [],
"tablespace": [],
"role": []
}

View File

@@ -33,11 +33,12 @@ if sys.path[0] != root:
from pgadmin import create_app
import config
import test_setup
import regression
# Execute setup.py if test SQLite database doesn't exist.
if os.path.isfile(config.TEST_SQLITE_PATH):
print("The configuration database already exists at '%s'. "
"Please remove the database and re-run the test suite." %
print("The configuration database already existed at '%s'. "
"Please remove the database and again run the test suite." %
config.TEST_SQLITE_PATH)
sys.exit(1)
else:
@@ -151,6 +152,26 @@ def sig_handler(signo, frame):
test_utils.drop_objects()
def get_tests_result(tests):
"""This function returns the total ran and total failed test cases count"""
total_ran = tests.testsRun
failed_cases_result = []
if total_ran:
if tests.failures:
for failed_case in tests.failures:
class_name = str(failed_case[0]).split('.')[-1].split()[0].\
strip(')')
failed_cases_result.append(class_name)
if tests.errors:
for error_case in tests.errors:
class_name = str(error_case[0]).split('.')[-1].split()[0].\
strip(')')
if class_name not in failed_cases_result:
failed_cases_result.append(class_name)
return total_ran, failed_cases_result
class StreamToLogger(object):
def __init__(self, logger, log_level=logging.INFO):
self.terminal = sys.stderr
@@ -176,6 +197,7 @@ class StreamToLogger(object):
if __name__ == '__main__':
test_result = dict()
# Register cleanup function to cleanup on exit
atexit.register(test_utils.drop_objects)
# Set signal handler for cleanup
@@ -203,7 +225,9 @@ if __name__ == '__main__':
for server in servers_info:
print("\n=============Running the test cases for '%s'============="
% server['name'], file=sys.stderr)
test_utils.create_test_server(server)
# Login the test client
test_utils.login_tester_account(test_client)
@@ -211,11 +235,28 @@ if __name__ == '__main__':
tests = unittest.TextTestRunner(stream=sys.stderr,
descriptions=True,
verbosity=2).run(suite)
ran_tests, failed_cases = get_tests_result(tests)
test_result[server['name']] = [ran_tests, failed_cases]
# Logout the test client
test_utils.logout_tester_account(test_client)
test_utils.delete_test_server(server)
test_utils.delete_test_server()
except SystemExit:
test_utils.drop_objects()
print("Please check output in file: %s/regression.log " % CURRENT_PATH)
print("\nTest Result Summary", file=sys.stderr)
print("============================", file=sys.stderr)
for server_res in test_result:
failed_cases = "\n\t".join(test_result[server_res][1])
total_failed = len(test_result[server_res][1])
total_passed = int(test_result[server_res][0]) - total_failed
print("%s: %s test%s passed, %s test%s failed %s%s" %
(server_res, total_passed, (total_passed != 1 and "s" or ""),
total_failed, (total_failed != 1 and "s" or ""),
(total_failed != 0 and ":\n\t" or ""), failed_cases), file=sys.stderr)
print("============================", file=sys.stderr)
print("\nPlease check output in file: %s/regression.log " % CURRENT_PATH)

View File

@@ -10,18 +10,24 @@ from __future__ import print_function
import os
import sys
import uuid
import psycopg2
import sqlite3
import logging
import config
import test_setup
import regression
SERVER_GROUP = test_setup.config_data['server_group']
logger = logging.getLogger(__name__)
file_name = os.path.basename(__file__)
def get_db_connection(db, username, password, host, port):
"""This function retruns the connection object of psycopg"""
"""This function returns the connection object of psycopg"""
connection = psycopg2.connect(database=db,
user=username,
password=password,
@@ -71,7 +77,7 @@ def get_config_data():
server_data = []
for srv in test_setup.config_data['server_credentials']:
data = {"name": srv['name'],
"comment": "",
"comment": srv['comment'],
"host": srv['host'],
"port": srv['db_port'],
"db": srv['maintenance_db'],
@@ -142,19 +148,22 @@ def create_database(server, db_name):
raise Exception("Error while creating database. %s" % exception)
def drop_database(connection, db_name):
def drop_database(connection, database_name):
"""This function used to drop the database"""
try:
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT * FROM pg_database db WHERE db.datname='%s'"
% database_name)
if pg_cursor.fetchall():
# Release pid if any process using database
pg_cursor.execute("select pg_terminate_backend(pid) from"
" pg_stat_activity where datname='%s'" %
database_name)
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute('''DROP DATABASE "%s"''' % db_name)
pg_cursor.execute('''DROP DATABASE "%s"''' % database_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
connection.close()
except Exception as exception:
raise Exception("Exception while dropping the database. %s" %
exception)
def create_server(server):
@@ -181,116 +190,198 @@ def delete_server(sid):
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
servers = cur.execute('SELECT * FROM server WHERE id=%s' % sid)
servers_count = len(servers.fetchall())
server_objects = cur.execute('SELECT * FROM server WHERE id=%s' % sid)
servers_count = len(server_objects.fetchall())
if servers_count:
cur.execute('DELETE FROM server WHERE id=%s' % sid)
conn.commit()
else:
print("No servers found to delete.", file=sys.stderr)
conn.close()
except Exception as err:
raise Exception("Error while deleting server %s" % err)
def create_test_server(server):
"""
This function create the test server which will act as parent server,
the other node will add under this server
:param server: server details
:type server: dict
:return: None
"""
# Create the server
server_id = create_server(server)
# Create test database
test_db_name = "test_db"
db_id = create_database(server, test_db_name)
# Add server info to test_server_dict
regression.test_server_dict["server"].append({"server_id": server_id,
"server": server})
regression.test_server_dict["database"].append({"server_id": server_id,
"db_id": db_id,
"db_name": test_db_name})
def delete_test_server(server):
test_server_dict = regression.test_server_dict
if test_server_dict:
def create_tablespace(server, test_tablespace_name):
try:
connection = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
db_name = test_server_dict["database"][0]["db_name"]
drop_database(connection, db_name)
# Delete the server
server_id = test_server_dict['server'][0]["server_id"]
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
servers = cur.execute('SELECT * FROM server WHERE id=%s' % server_id)
servers_count = len(servers.fetchall())
if servers_count:
cur.execute('DELETE FROM server WHERE id=%s' % server_id)
conn.commit()
conn.close()
server_dict = regression.test_server_dict["server"]
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor = connection.cursor()
pg_cursor.execute("CREATE TABLESPACE %s LOCATION '%s'" %
(test_tablespace_name, server['tablespace_path']))
connection.set_isolation_level(old_isolation_level)
connection.commit()
# Pop the server from dict if it's deleted
server_dict = [server_dict.pop(server_dict.index(item))
for item in server_dict
if str(server_id) == str(item["server_id"])]
# Get 'oid' from newly created tablespace
pg_cursor.execute(
"SELECT ts.oid from pg_tablespace ts WHERE ts.spcname='%s'" %
test_tablespace_name)
oid = pg_cursor.fetchone()
tspc_id = ''
if oid:
tspc_id = oid[0]
connection.close()
return tspc_id
except Exception as exception:
raise Exception("Error while creating tablespace. %s" % exception)
# Pop the db from dict if it's deleted
db_dict = regression.test_server_dict["database"]
db_dict = [db_dict.pop(db_dict.index(item)) for item in db_dict
if server_id == item["server_id"]]
def delete_tablespace(connection, test_tablespace_name):
try:
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT * FROM pg_tablespace ts WHERE"
" ts.spcname='%s'" % test_tablespace_name)
tablespace_count = len(pg_cursor.fetchall())
if tablespace_count:
old_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
pg_cursor.execute("DROP TABLESPACE %s" % test_tablespace_name)
connection.set_isolation_level(old_isolation_level)
connection.commit()
connection.close()
except Exception as exception:
exception = "%s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception, file=sys.stderr)
raise Exception(exception)
def create_test_server(server_info):
"""
This function create the test server which will act as parent server,
the other node will add under this server
:param server_info: server details
:type server_info: dict
:return: None
"""
# Create the server
srv_id = create_server(server_info)
# Create test database
test_db_name = "test_db_%s" % str(uuid.uuid4())[1:8]
db_id = create_database(server_info, test_db_name)
# TODO: Need to decide about test tablespace creation
# Create tablespace
# test_tablespace_name = "test_tablespace"
# tablespace_id = create_tablespace(server, test_tablespace_name)
# Add server info to test_server_dict
regression.test_server_dict["server"].append({"server_id": srv_id,
"server": server_info})
regression.test_server_dict["database"].append({"server_id": srv_id,
"db_id": db_id,
"db_name": test_db_name})
def delete_test_server():
test_server_dict = regression.test_server_dict
test_servers = test_server_dict["server"]
test_databases = test_server_dict["database"]
test_table_spaces = test_server_dict["tablespace"]
try:
for test_server in test_servers:
srv_id = test_server["server_id"]
servers_dict = test_server["server"]
for database in test_databases:
connection = get_db_connection(servers_dict['db'],
servers_dict['username'],
servers_dict['db_password'],
servers_dict['host'],
servers_dict['port'])
database_name = database["db_name"]
# Drop database
drop_database(connection, database_name)
# Delete server
delete_server(srv_id)
except Exception as exception:
exception = "Exception: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception)
logger.exception(exception)
# Clear test_server_dict
for item in regression.test_server_dict:
del regression.test_server_dict[item][:]
def remove_db_file():
if os.path.isfile(config.SQLITE_PATH):
os.remove(config.SQLITE_PATH)
def drop_objects():
"""This function use to cleanup the created the objects(servers, databases,
schemas etc) during the test suite run"""
try:
conn = sqlite3.connect(config.SQLITE_PATH)
cur = conn.cursor()
servers = cur.execute('SELECT name, host, port, maintenance_db,'
' username, id FROM server')
if servers:
all_servers = servers.fetchall()
for server_info in all_servers:
name = server_info[0]
host = server_info[1]
db_port = server_info[2]
# Cleanup in node_info_dict
servers_info = regression.node_info_dict['sid']
if servers_info:
for server in servers_info:
server_id = server.keys()[0]
server = server.values()[0]
if regression.node_info_dict['did']:
db_conn = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
db_dict = regression.node_info_dict['did'][0]
if int(server_id) in db_dict:
db_name = db_dict[int(server_id)]["db_name"]
drop_database(db_conn, db_name)
delete_server(server_id)
server_id = server_info[5]
config_servers = test_setup.config_data['server_credentials']
db_password = ''
# Get the db password from config file for appropriate server
for srv in config_servers:
if (srv['name'], srv['host'], srv['db_port']) == \
(name, host, db_port):
db_password = srv['db_password']
if db_password:
# Drop database
connection = get_db_connection(server_info[3],
server_info[4],
db_password,
server_info[1],
server_info[2])
# Cleanup in test_server_dict
servers = regression.test_server_dict["server"]
if servers:
for server in servers:
server_id = server["server_id"]
server = server["server"]
if regression.test_server_dict["database"]:
db_info = regression.test_server_dict["database"]
db_dict = [item for item in db_info
if server_id == item["server_id"]]
if db_dict:
for db in db_dict:
db_name = db["db_name"]
db_conn = get_db_connection(server['db'],
server['username'],
server['db_password'],
server['host'],
server['port'])
drop_database(db_conn, db_name)
delete_server(server_id)
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT db.datname FROM pg_database db")
databases = pg_cursor.fetchall()
if databases:
for db in databases:
# Do not drop the default databases
if db[0] not in ["postgres", "template1",
"template0"]:
drop_database(connection, db[0])
connection.close()
# Remove the test SQLite database
if os.path.isfile(config.SQLITE_PATH):
os.remove(config.SQLITE_PATH)
# Delete tablespace
connection = get_db_connection(server_info[3],
server_info[4],
db_password,
server_info[1],
server_info[2])
pg_cursor = connection.cursor()
pg_cursor.execute("SELECT * FROM pg_tablespace")
table_spaces = pg_cursor.fetchall()
if table_spaces:
for tablespace in table_spaces:
# Do not delete default table spaces
if tablespace[0] not in ["pg_default",
"pg_global"]:
tablespace_name = tablespace[0]
# Delete tablespace
delete_tablespace(connection, tablespace_name)
# Delete server
delete_server(server_id)
conn.close()
# Remove SQLite db file
remove_db_file()
except Exception as exception:
remove_db_file()
exception = "Exception: %s: line:%s %s" % (
file_name, sys.exc_traceback.tb_lineno, exception)
print(exception)
logger.exception(exception)