Added job step and job schedule disable icons to identify it quickly within the browser tree. Fixes #4636

Add Reverse Engineered and Modified SQL tests for pgAgent jobs. Fixes #4623
Fixed modified SQL issue while adding an exception in pgAgent job schedule. Fixes #5356
This commit is contained in:
Neel Patel
2020-04-21 17:00:21 +05:30
committed by Akshay Joshi
parent f289dfb762
commit 07f72252d7
41 changed files with 1616 additions and 39 deletions

View File

@@ -86,6 +86,14 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
if not self.db_con['info'] == "Database connected.":
raise Exception("Could not connect to database.")
self.test_config_db_conn = utils.get_db_connection(
self.server['db'],
self.server['username'],
self.server['db_password'],
self.server['host'],
self.server['port']
)
# Get the application path
self.apppath = os.getcwd()
# Status of the test case
@@ -104,8 +112,10 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
# while running the test cases
self.JSON_PLACEHOLDERS = {'schema_id': '<SCHEMA_ID>',
'owner': '<OWNER>',
'timestamptz': '<TIMESTAMPTZ>',
'password': '<PASSWORD>'}
'timestamptz_1': '<TIMESTAMPTZ_1>',
'password': '<PASSWORD>',
'pga_job_id': '<PGA_JOB_ID>',
'timestamptz_2': '<TIMESTAMPTZ_2>'}
resql_module_list = create_resql_module_list(
BaseTestGenerator.re_sql_module_list,
@@ -150,6 +160,7 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
database_utils.disconnect_database(
self, self.server_information['server_id'],
self.server_information['db_id'])
self.test_config_db_conn.close()
def get_db_connection(self):
"""Get the database connection."""
@@ -222,8 +233,18 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
object_id = None
for scenario in scenarios:
if 'precondition_sql' in scenario and \
not self.check_precondition(scenario['precondition_sql']):
skip_test_case = True
if 'precondition_sql' in scenario:
if 'pgagent_test' in scenario and self.check_precondition(
scenario['precondition_sql'], True):
skip_test_case = False
elif self.check_precondition(
scenario['precondition_sql'], False):
skip_test_case = False
else:
skip_test_case = False
if skip_test_case:
print(scenario['name'] +
"... skipped (pre-condition SQL not satisfied)")
continue
@@ -418,7 +439,8 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
fp = open(output_file, "r")
# Used rstrip to remove trailing \n
sql = fp.read().rstrip()
sql = self.preprocess_expected_sql(scenario, sql, resp_sql)
sql = self.preprocess_expected_sql(scenario, sql, resp_sql,
object_id)
try:
self.assertEquals(sql, resp_sql)
except Exception as e:
@@ -472,7 +494,8 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
fp = open(output_file, "r")
# Used rstrip to remove trailing \n
sql = fp.read().rstrip()
sql = self.preprocess_expected_sql(scenario, sql, resp_sql)
sql = self.preprocess_expected_sql(scenario, sql, resp_sql,
object_id)
try:
self.assertEquals(sql, resp_sql)
except Exception as e:
@@ -488,7 +511,8 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
return False
elif 'expected_sql' in scenario:
exp_sql = scenario['expected_sql']
exp_sql = self.preprocess_expected_sql(scenario, exp_sql, resp_sql)
exp_sql = self.preprocess_expected_sql(scenario, exp_sql, resp_sql,
object_id)
try:
self.assertEquals(exp_sql, resp_sql)
except Exception as e:
@@ -498,15 +522,19 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
return True
def check_precondition(self, precondition_sql):
def check_precondition(self, precondition_sql, use_test_config_db_conn):
"""
This method executes precondition_sql and returns appropriate result
:param precondition_sql: SQL query in format select count(*) from ...
:return: True/False depending on precondition_sql result
"""
precondition_flag = False
self.get_db_connection()
pg_cursor = self.connection.cursor()
if not use_test_config_db_conn:
self.get_db_connection()
pg_cursor = self.connection.cursor()
else:
pg_cursor = self.test_config_db_conn.cursor()
try:
pg_cursor.execute(precondition_sql)
precondition_result = pg_cursor.fetchone()
@@ -561,18 +589,49 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
:return:
"""
if 'convert_timestamp_columns' in scenario:
for col in scenario['convert_timestamp_columns']:
if 'data' in scenario and col in scenario['data']:
col_list = list()
key_attr = ''
is_tz_columns_list = False
tz_index = 0
if isinstance(scenario['convert_timestamp_columns'], dict):
for key, value in scenario[
'convert_timestamp_columns'].items():
col_list = scenario['convert_timestamp_columns'][key]
key_attr = key
break
else:
col_list = scenario['convert_timestamp_columns']
is_tz_columns_list = True
for col in col_list:
if ('data' in scenario and col in scenario['data']) or \
(key_attr and 'data' in scenario and 'type' in
scenario and scenario['type'] == 'create' and col in
scenario['data'][key_attr][0]) or \
(key_attr and 'data' in scenario and 'type' in
scenario and scenario['type'] == 'alter' and col in
scenario['data'][key_attr]['added'][0]):
self.get_db_connection()
pg_cursor = self.connection.cursor()
try:
query = "SELECT timestamp with time zone '" \
+ scenario['data'][col] + "'"
if is_tz_columns_list:
query = "SELECT timestamp with time zone '" \
+ scenario['data'][col] + "'"
elif scenario['type'] == 'create':
query = "SELECT timestamp with time zone '" \
+ scenario['data'][key_attr][0][col] + "'"
else:
query = "SELECT timestamp with time zone '" \
+ scenario['data'][key_attr][
'added'][0][col] + "'"
pg_cursor.execute(query)
converted_tz = pg_cursor.fetchone()
if len(converted_tz) >= 1:
tz_index = tz_index + 1
tz_str = "timestamptz_{0}".format(tz_index)
sql = sql.replace(
self.JSON_PLACEHOLDERS['timestamptz'],
self.JSON_PLACEHOLDERS[tz_str],
converted_tz[0])
except Exception as e:
traceback.print_exc()
@@ -624,7 +683,7 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
return data
def preprocess_expected_sql(self, scenario, sql, resp_sql):
def preprocess_expected_sql(self, scenario, sql, resp_sql, object_id):
"""
This function preprocesses expected sql before comparing
it with response sql.
@@ -654,6 +713,12 @@ class ReverseEngineeredSQLTestCases(BaseTestGenerator):
sql = sql.replace(self.JSON_PLACEHOLDERS['password'], password)
# Replace place holder <owner> with the current username
# used to connect to the database
if 'pga_job_id' in scenario:
sql = sql.replace(self.JSON_PLACEHOLDERS['pga_job_id'],
str(object_id))
return sql
def replace_placeholder_with_id(self, value):