diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py index e2c52ce939e..bae2a0e9d94 100644 --- a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py @@ -65,13 +65,14 @@ class TestStructure: class TaskManager: process_timeout = -1 - def __init__(self, command_list:list, working_dir: os.path): + def __init__(self, command_list:list, working_dir: os.path, prev_run_cmd_length = 0): self._command_list = command_list self._process_list = list() self._workers = list() self._timers = list() - self._idx = 0 self._log_filename = os.path.join(working_dir, f"log_{LOG_NAME_REPLACE_STR}.log") + self._prev_run_cmd_length = prev_run_cmd_length + self._idx = 0 def __create_thread(self, func): thread = threading.Thread(target=func) @@ -83,7 +84,7 @@ class TaskManager: if len(self._command_list) <= self._idx: logger.warning(f"Skip worker initialiazation. Command list lenght <= worker index") return - log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx)) + log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length)) with open(log_file_name, "w") as log_file: worker = self.__create_thread( self._process_list.append(Popen(self._command_list[self._idx], shell=True, stdout=log_file, stderr=log_file))) @@ -114,7 +115,7 @@ class TaskManager: if self._idx >= len(self._command_list): return False pid = self.__find_free_process() - log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx)) + log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length)) with open(log_file_name, "w") as log_file: self._workers[pid] = self.__create_thread(self.__update_process(pid, log_file)) self._workers[pid].join() @@ -136,6 +137,7 @@ class TaskManager: break except TimeoutExpired: continue + return self._idx class TestParallelRunner: def __init__(self, exec_file_path: os.path, test_command_line: list, worker_num: int, working_dir: os.path, cache_path: os.path): @@ -227,20 +229,24 @@ class TestParallelRunner: return test_list_cache - def __generate_proved_test_list(self, test_list: list, test_list_runtime:list): - proved_test_list = list() - if len(test_list) == len(test_list_runtime): - proved_test_list = test_list - else: - for test in test_list: - if test._name in test_list_runtime: - proved_test_list.append(test) + def __generate_test_lists(self, test_list_cache: list, test_list_runtime:list): + cached_test_list = list() + runtime_test_test = list() + cached_test_list_names = list() + it = 0 + for test in test_list_cache: + if test._name in test_list_runtime: + cached_test_list.append(test) + cached_test_list_names.append(test._name) + for test in test_list_runtime: + if not test in cached_test_list_names: + runtime_test_test.append(test) - if len(proved_test_list) < len(test_list_runtime): - logger.warning(f'Cache file is not relevant the run. The test list will be taken from runtime') - return [] - else: - return proved_test_list + if len(runtime_test_test) > 0: + logger.warning(f'Cache file is not relevant the run. The will works in hybrid mode.') + logger.info(f'Test count from cache: {len(cached_test_list)}') + logger.info(f'Test count from runtime: {len(runtime_test_test)}') + return cached_test_list, runtime_test_test def __prepare_smart_filters(self, proved_test_list:list): @@ -307,20 +313,30 @@ class TestParallelRunner: test_list_runtime = self.__get_test_list_by_runtime() test_list_cache = self.__get_test_list_by_cache() - proved_test_list = self.__generate_proved_test_list(test_list_cache, test_list_runtime) - final_test_list = list() - if len(proved_test_list) > 0: + + cached_test_list, runtime_test_list = self.__generate_test_lists(test_list_cache, test_list_runtime) + + if len(cached_test_list) > 0: self._is_save_cache = False - logger.info(f"Test list is taken from cache.") - self._total_test_cnt = len(proved_test_list) - final_test_list = self.__prepare_smart_filters(proved_test_list) - else: - logger.info(f"Test list is taken from runtime.") - self._total_test_cnt = len(test_list_runtime) - final_test_list = test_list_runtime - final_test_list.reverse() + self._total_test_cnt += len(cached_test_list) + cached_test_list = self.__prepare_smart_filters(cached_test_list) + if len(runtime_test_list) > 0: + self._is_save_cache = True + self._total_test_cnt += len(runtime_test_list) + runtime_test_list.reverse() logger.info(f"Total test counter is {self._total_test_cnt}") - return final_test_list + return cached_test_list, runtime_test_list + + def __execute_tests(self, filters: list(), prev_worker_cnt = 0): + commands = [f'{self._command} --gtest_filter={filter}' for filter in filters] + task_manager = TaskManager(commands, self._working_dir, prev_worker_cnt) + for _ in progressbar(range(self._worker_num), "Worker initialization: ", 40): + task_manager.init_worker() + for _ in progressbar(range(len(commands) - self._worker_num), "Worker execution: ", 40): + if not task_manager.update_worker(): + break + return task_manager.compelete_all_processes() + def run(self): if TaskManager.process_timeout == -1: @@ -328,17 +344,23 @@ class TestParallelRunner: logger.info(f"Run test parallel is started. Worker num is {self._worker_num}") t_start = datetime.datetime.now() - commands = [f'{self._command} --gtest_filter={filter}' for filter in self.__get_filters()] - task_manager = TaskManager(commands, self._working_dir) - for _ in progressbar(range(self._worker_num), "Worker initialization: ", 40): - task_manager.init_worker() - for _ in progressbar(range(len(commands) - self._worker_num), "Worker execution: ", 40): - if not task_manager.update_worker(): - break - task_manager.compelete_all_processes() + filters_cache, filters_runtime = self.__get_filters() + + worker_cnt = 0 + if len(filters_runtime): + logger.info(f"Execute jobs taken from runtime") + worker_cnt = self.__execute_tests(filters_runtime, worker_cnt) + if len(filters_cache): + logger.info(f"Execute jobs taken from cache") + self.__execute_tests(filters_cache, worker_cnt) + t_end = datetime.datetime.now() - logger.info(f"Run test parallel is finished successfully. Total time is {(t_end - t_start).total_seconds()}s") + total_seconds = (t_end - t_start).total_seconds() + sec = round(total_seconds % 60, 2) + min = int(total_seconds / 60) % 60 + h = int(total_seconds / 360) % 60 + logger.info(f"Run test parallel is finished successfully. Total time is {h}h:{min}m:{sec}s") def postprocess_logs(self):