Various improvements to the regression tests, notably avoiding use of signals unavailable on Windows.

This commit is contained in:
Navnath Gadakh 2017-01-08 18:48:04 +05:30 committed by Dave Page
parent bdf9761472
commit 1325b30e0b
15 changed files with 46 additions and 44 deletions

View File

@ -40,8 +40,8 @@ class PackageAddTestCase(BaseTestGenerator):
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)

View File

@ -42,8 +42,8 @@ class PackageDeleteTestCase(BaseTestGenerator):
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)

View File

@ -42,8 +42,8 @@ class PackageGetTestCase(BaseTestGenerator):
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)

View File

@ -44,8 +44,8 @@ class PackagePutTestCase(BaseTestGenerator):
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Packages not supported by PostgreSQL."
self.skipTest(message)

View File

@ -36,8 +36,8 @@ class SynonymAddTestCase(BaseTestGenerator):
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,

View File

@ -36,8 +36,8 @@ class SynonymDeleteTestCase(BaseTestGenerator):
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,

View File

@ -36,8 +36,8 @@ class SynonymGetTestCase(BaseTestGenerator):
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,

View File

@ -40,8 +40,8 @@ class SynonymPutTestCase(BaseTestGenerator):
self.db_id = schema_info["db_id"]
server_con = server_utils.connect_server(self, self.server_id)
if server_con:
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Synonym not supported by PG."
self.skipTest(message)
db_con = database_utils.connect_database(self, utils.SERVER_GROUP,

View File

@ -25,24 +25,23 @@ class DatabasesGetTestCase(BaseTestGenerator):
def runTest(self):
""" This function will fetch added database. """
server_data = parent_node_dict["database"][-1]
self.server_id = server_data["server_id"]
self.db_id = server_data['db_id']
server_id = server_data["server_id"]
db_id = server_data['db_id']
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
utils.SERVER_GROUP,
server_id,
db_id)
try:
if db_con["info"] == "Database connected.":
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' + str(
self.server_id) + '/' +
str(self.db_id), follow_redirects=True)
self.assertEquals(response.status_code, 200)
response = self.tester.get(
self.url + str(utils.SERVER_GROUP) + '/' + str(
server_id) + '/' +
str(db_id), follow_redirects=True)
self.assertEquals(response.status_code, 200)
else:
raise Exception("Could not connect to database.")
except Exception as exception:
raise Exception("Error while getting database. %s" % exception)
finally:
# Disconnect database to delete it
database_utils.disconnect_database(self, self.server_id,
self.db_id)
database_utils.disconnect_database(self, server_id, db_id)

View File

@ -24,7 +24,7 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
]
def setUp(self):
self.db_name = "test_db_put_%s" % str(uuid.uuid4())[1:8],
self.db_name = "test_db_put_%s" % str(uuid.uuid4())[1:8]
self.db_id = utils.create_database(self.server, self.db_name)
self.server_id = parent_node_dict["server"][-1]["server_id"]
db_dict = {"server_id": self.server_id, "db_id": self.db_id,
@ -34,9 +34,9 @@ class DatabasesUpdateTestCase(BaseTestGenerator):
def runTest(self):
""" This function will update the comments field of database."""
db_con = database_utils.connect_database(self,
utils.SERVER_GROUP,
self.server_id,
self.db_id)
utils.SERVER_GROUP,
self.server_id,
self.db_id)
if db_con["info"] == "Database connected.":
try:
data = {

View File

@ -28,8 +28,8 @@ class ResourceGroupsAddTestCase(BaseTestGenerator):
if not server_con["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
if "server_type" in server_con["data"]:
if server_con["data"]["server_type"] == "pg":
if "type" in server_con["data"]:
if server_con["data"]["type"] == "pg":
message = "Resource groups are not supported by PG."
self.skipTest(message)

View File

@ -27,8 +27,8 @@ class ResourceGroupsDeleteTestCase(BaseTestGenerator):
if not server_response["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
if "server_type" in server_response["data"]:
if server_response["data"]["server_type"] == "pg":
if "type" in server_response["data"]:
if server_response["data"]["type"] == "pg":
message = "Resource groupa are not supported by PG."
self.skipTest(message)
self.resource_group = "test_resource_group_delete%s" % \

View File

@ -28,8 +28,8 @@ class ResourceGroupsPutTestCase(BaseTestGenerator):
if not server_response["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
if "server_type" in server_response["data"]:
if server_response["data"]["server_type"] == "pg":
if "type" in server_response["data"]:
if server_response["data"]["type"] == "pg":
message = "Resource groups are not supported by PG."
self.skipTest(message)
self.resource_group_name = "test_resource_group_put%s" % \

View File

@ -27,8 +27,8 @@ class ResourceGroupsGetTestCase(BaseTestGenerator):
if not server_response["info"] == "Server connected.":
raise Exception("Could not connect to server to add resource "
"groups.")
if "server_type" in server_response["data"]:
if server_response["data"]["server_type"] == "pg":
if "type" in server_response["data"]:
if server_response["data"]["type"] == "pg":
message = "Resource groups are not supported by PG."
self.skipTest(message)
self.resource_group = "test_resource_group_get%s" % \

View File

@ -226,10 +226,13 @@ if __name__ == '__main__':
# Register cleanup function to cleanup on exit
atexit.register(drop_objects)
# Set signal handler for cleanup
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGABRT, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGQUIT, sig_handler)
signal_list = dir(signal)
required_signal_list = ['SIGTERM', 'SIGABRT', 'SIGQUIT', 'SIGINT']
# Get the OS wise supported signals
supported_signal_list = [sig for sig in required_signal_list if
sig in signal_list]
for sig in supported_signal_list:
signal.signal(getattr(signal, sig), sig_handler)
# Set basic logging configuration for log file
logging.basicConfig(level=logging.DEBUG,