Extend Parallel runner to work in hybrid mode in case cache is not relevant (#15471)

* Extend Parallel runner to work in hybrid mode in case cache is not relevant

* small refactoring

* Fix mistakes
This commit is contained in:
Irina Efode 2023-02-03 00:19:13 +04:00 committed by GitHub
parent 566fae2b01
commit 8c428d2e1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -65,13 +65,14 @@ class TestStructure:
class TaskManager:
process_timeout = -1
def __init__(self, command_list:list, working_dir: os.path):
def __init__(self, command_list:list, working_dir: os.path, prev_run_cmd_length = 0):
self._command_list = command_list
self._process_list = list()
self._workers = list()
self._timers = list()
self._idx = 0
self._log_filename = os.path.join(working_dir, f"log_{LOG_NAME_REPLACE_STR}.log")
self._prev_run_cmd_length = prev_run_cmd_length
self._idx = 0
def __create_thread(self, func):
thread = threading.Thread(target=func)
@ -83,7 +84,7 @@ class TaskManager:
if len(self._command_list) <= self._idx:
logger.warning(f"Skip worker initialiazation. Command list lenght <= worker index")
return
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx))
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length))
with open(log_file_name, "w") as log_file:
worker = self.__create_thread(
self._process_list.append(Popen(self._command_list[self._idx], shell=True, stdout=log_file, stderr=log_file)))
@ -114,7 +115,7 @@ class TaskManager:
if self._idx >= len(self._command_list):
return False
pid = self.__find_free_process()
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx))
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length))
with open(log_file_name, "w") as log_file:
self._workers[pid] = self.__create_thread(self.__update_process(pid, log_file))
self._workers[pid].join()
@ -136,6 +137,7 @@ class TaskManager:
break
except TimeoutExpired:
continue
return self._idx
class TestParallelRunner:
def __init__(self, exec_file_path: os.path, test_command_line: list, worker_num: int, working_dir: os.path, cache_path: os.path):
@ -227,20 +229,24 @@ class TestParallelRunner:
return test_list_cache
def __generate_proved_test_list(self, test_list: list, test_list_runtime:list):
proved_test_list = list()
if len(test_list) == len(test_list_runtime):
proved_test_list = test_list
else:
for test in test_list:
if test._name in test_list_runtime:
proved_test_list.append(test)
def __generate_test_lists(self, test_list_cache: list, test_list_runtime:list):
cached_test_list = list()
runtime_test_test = list()
cached_test_list_names = list()
it = 0
for test in test_list_cache:
if test._name in test_list_runtime:
cached_test_list.append(test)
cached_test_list_names.append(test._name)
for test in test_list_runtime:
if not test in cached_test_list_names:
runtime_test_test.append(test)
if len(proved_test_list) < len(test_list_runtime):
logger.warning(f'Cache file is not relevant the run. The test list will be taken from runtime')
return []
else:
return proved_test_list
if len(runtime_test_test) > 0:
logger.warning(f'Cache file is not relevant the run. The will works in hybrid mode.')
logger.info(f'Test count from cache: {len(cached_test_list)}')
logger.info(f'Test count from runtime: {len(runtime_test_test)}')
return cached_test_list, runtime_test_test
def __prepare_smart_filters(self, proved_test_list:list):
@ -307,20 +313,30 @@ class TestParallelRunner:
test_list_runtime = self.__get_test_list_by_runtime()
test_list_cache = self.__get_test_list_by_cache()
proved_test_list = self.__generate_proved_test_list(test_list_cache, test_list_runtime)
final_test_list = list()
if len(proved_test_list) > 0:
cached_test_list, runtime_test_list = self.__generate_test_lists(test_list_cache, test_list_runtime)
if len(cached_test_list) > 0:
self._is_save_cache = False
logger.info(f"Test list is taken from cache.")
self._total_test_cnt = len(proved_test_list)
final_test_list = self.__prepare_smart_filters(proved_test_list)
else:
logger.info(f"Test list is taken from runtime.")
self._total_test_cnt = len(test_list_runtime)
final_test_list = test_list_runtime
final_test_list.reverse()
self._total_test_cnt += len(cached_test_list)
cached_test_list = self.__prepare_smart_filters(cached_test_list)
if len(runtime_test_list) > 0:
self._is_save_cache = True
self._total_test_cnt += len(runtime_test_list)
runtime_test_list.reverse()
logger.info(f"Total test counter is {self._total_test_cnt}")
return final_test_list
return cached_test_list, runtime_test_list
def __execute_tests(self, filters: list(), prev_worker_cnt = 0):
commands = [f'{self._command} --gtest_filter={filter}' for filter in filters]
task_manager = TaskManager(commands, self._working_dir, prev_worker_cnt)
for _ in progressbar(range(self._worker_num), "Worker initialization: ", 40):
task_manager.init_worker()
for _ in progressbar(range(len(commands) - self._worker_num), "Worker execution: ", 40):
if not task_manager.update_worker():
break
return task_manager.compelete_all_processes()
def run(self):
if TaskManager.process_timeout == -1:
@ -328,17 +344,23 @@ class TestParallelRunner:
logger.info(f"Run test parallel is started. Worker num is {self._worker_num}")
t_start = datetime.datetime.now()
commands = [f'{self._command} --gtest_filter={filter}' for filter in self.__get_filters()]
task_manager = TaskManager(commands, self._working_dir)
for _ in progressbar(range(self._worker_num), "Worker initialization: ", 40):
task_manager.init_worker()
for _ in progressbar(range(len(commands) - self._worker_num), "Worker execution: ", 40):
if not task_manager.update_worker():
break
task_manager.compelete_all_processes()
filters_cache, filters_runtime = self.__get_filters()
worker_cnt = 0
if len(filters_runtime):
logger.info(f"Execute jobs taken from runtime")
worker_cnt = self.__execute_tests(filters_runtime, worker_cnt)
if len(filters_cache):
logger.info(f"Execute jobs taken from cache")
self.__execute_tests(filters_cache, worker_cnt)
t_end = datetime.datetime.now()
logger.info(f"Run test parallel is finished successfully. Total time is {(t_end - t_start).total_seconds()}s")
total_seconds = (t_end - t_start).total_seconds()
sec = round(total_seconds % 60, 2)
min = int(total_seconds / 60) % 60
h = int(total_seconds / 360) % 60
logger.info(f"Run test parallel is finished successfully. Total time is {h}h:{min}m:{sec}s")
def postprocess_logs(self):