Python test famework should log only relevant exception and exit with a non-zero code on error. #6157

This commit is contained in:
Aditya Toshniwal
2023-04-20 13:27:12 +05:30
parent d6ecc531e0
commit 7e2add8db1
2 changed files with 71 additions and 76 deletions

View File

@@ -145,62 +145,58 @@ def clear_node_info_dict():
def create_database(server, db_name, encoding=None): def create_database(server, db_name, encoding=None):
"""This function used to create database and returns the database id""" """This function used to create database and returns the database id"""
db_id = '' db_id = ''
try: connection = get_db_connection(
connection = get_db_connection( server['db'],
server['db'], server['username'],
server['username'], server['db_password'],
server['db_password'], server['host'],
server['host'], server['port'],
server['port'], server['sslmode']
server['sslmode'] )
) old_isolation_level = connection.isolation_level
old_isolation_level = connection.isolation_level set_isolation_level(connection, 0)
set_isolation_level(connection, 0) connection.autocommit = True
connection.autocommit = True pg_cursor = connection.cursor()
pg_cursor = connection.cursor() if encoding is None:
if encoding is None: pg_cursor.execute(
pg_cursor.execute( '''CREATE DATABASE "%s" TEMPLATE template0''' % db_name)
'''CREATE DATABASE "%s" TEMPLATE template0''' % db_name) else:
else: pg_cursor.execute(
pg_cursor.execute( '''CREATE DATABASE "%s" TEMPLATE template0
'''CREATE DATABASE "%s" TEMPLATE template0 ENCODING='%s' LC_COLLATE='%s' LC_CTYPE='%s' ''' %
ENCODING='%s' LC_COLLATE='%s' LC_CTYPE='%s' ''' % (db_name, encoding[0], encoding[1], encoding[1]))
(db_name, encoding[0], encoding[1], encoding[1])) connection.autocommit = False
connection.autocommit = False set_isolation_level(connection, old_isolation_level)
set_isolation_level(connection, old_isolation_level) connection.commit()
connection.commit()
# Get 'oid' from newly created database # Get 'oid' from newly created database
pg_cursor.execute("SELECT db.oid from pg_catalog.pg_database db WHERE" pg_cursor.execute("SELECT db.oid from pg_catalog.pg_database db WHERE"
" db.datname='%s'" % db_name) " db.datname='%s'" % db_name)
oid = pg_cursor.fetchone() oid = pg_cursor.fetchone()
if oid: if oid:
db_id = oid[0] db_id = oid[0]
connection.close() connection.close()
# In PostgreSQL 15 the default public schema that every database has # In PostgreSQL 15 the default public schema that every database has
# will have a different set of permissions. In fact, before PostgreSQL # will have a different set of permissions. In fact, before PostgreSQL
# 15, every user could manipulate the public schema of a database he is # 15, every user could manipulate the public schema of a database he is
# not owner. Since the upcoming new version, only the database owner # not owner. Since the upcoming new version, only the database owner
# will be granted full access to the public schema, while other users # will be granted full access to the public schema, while other users
# will need to get an explicit GRANT # will need to get an explicit GRANT
connection = get_db_connection( connection = get_db_connection(
db_name, db_name,
server['username'], server['username'],
server['db_password'], server['db_password'],
server['host'], server['host'],
server['port'], server['port'],
server['sslmode'] server['sslmode']
) )
pg_cursor = connection.cursor() pg_cursor = connection.cursor()
pg_cursor.execute('''GRANT ALL ON SCHEMA public TO PUBLIC''') pg_cursor.execute('''GRANT ALL ON SCHEMA public TO PUBLIC''')
connection.commit() connection.commit()
connection.close() connection.close()
return db_id return db_id
except Exception:
traceback.print_exc(file=sys.stderr)
return db_id
def create_table(server, db_name, table_name, extra_columns=[]): def create_table(server, db_name, table_name, extra_columns=[]):
@@ -1173,30 +1169,27 @@ def get_server_type(server):
:param server: :param server:
:return: :return:
""" """
try: connection = get_db_connection(
connection = get_db_connection( server['db'],
server['db'], server['username'],
server['username'], server['db_password'],
server['db_password'], server['host'],
server['host'], server['port'],
server['port'], server['sslmode']
server['sslmode'] )
)
pg_cursor = connection.cursor() pg_cursor = connection.cursor()
# Get 'version' string # Get 'version' string
pg_cursor.execute("SELECT version()") pg_cursor.execute("SELECT version()")
version_string = pg_cursor.fetchone() version_string = pg_cursor.fetchone()
connection.close() connection.close()
if isinstance(version_string, tuple): if isinstance(version_string, tuple):
version_string = version_string[0] version_string = version_string[0]
if "EnterpriseDB" in version_string: if "EnterpriseDB" in version_string:
return 'ppas' return 'ppas'
return 'pg' return 'pg'
except Exception:
traceback.print_exc(file=sys.stderr)
def check_binary_path_or_skip_test(cls, utility_name): def check_binary_path_or_skip_test(cls, utility_name):

View File

@@ -477,6 +477,7 @@ def execute_test(test_module_list_passed, server_passed, driver_passed,
:param parallel_ui_test: parallel ui tests :param parallel_ui_test: parallel ui tests
:return: :return:
""" """
server_information = None
try: try:
print("\n=============Running the test cases for '%s' =============" print("\n=============Running the test cases for '%s' ============="
% server_passed['name'], file=sys.stderr) % server_passed['name'], file=sys.stderr)
@@ -568,12 +569,13 @@ def execute_test(test_module_list_passed, server_passed, driver_passed,
threading.current_thread().ident, threading.current_thread().ident,
threading.current_thread().name)) threading.current_thread().name))
# Mark failure as true # Mark failure as true
if str(exc).find('other sessions using the database.') != -1: if 'other sessions using the database.' not in str(exc):
global failure global failure
failure = True failure = True
finally: finally:
# Delete test server # Delete test server
test_utils.delete_server(test_client, server_information) if server_information:
test_utils.delete_server(test_client, server_information)
# Delete web-driver instance # Delete web-driver instance
thread_name = "parallel_tests" + server_passed['name'] thread_name = "parallel_tests" + server_passed['name']
if threading.current_thread().name == thread_name: if threading.current_thread().name == thread_name: