From bf5690fa7dcbed5cb2bb5eca91aa4108c54b752f Mon Sep 17 00:00:00 2001 From: Mikhail Ryzhov Date: Tue, 5 Sep 2023 16:23:17 +0200 Subject: [PATCH] [GA] Parallel tests (#18773) * test job * added script to the test package * test call fix * switched test to large runner * Added option to split tests by suites * extended logs * enabled test cache * fixed workload * optimized splitting mode * excluded disabled suites * temroary removed parallel logs * added failed logs * fixed empty name in suites * test on 4 cores * make step optional * fixed param * test * grouping suites * set suite arg * increase test timeout * test commit * test pip deps * include requirements.txt to the test package * fixed deps step order * fixed test counter * fixed smart filter for suites * clean up * disabled repeat failed tests * review comments * use runtime execution time for skipped tests * removed disabled suites * reduced command lines * enabled tests results * fixed typo * removed unused argument pp * Log improvements * merge cached and runtime filters * fixed order * fixed init list error * fixed cache writing * enable windows pipeline * changed runner for windows * optimized balancing using heap * Fixed test counter * fixed windows pipeline * extended logging * changed pipelines * added logs on Windows * fixed pipelines * debug * removed os specific code * fixed "#" * fixed test results * fixed win pipeline * cleanup debug * rebase fixes * windows pip requirements * aligned run_conformance.py * Apply suggestions from code review Co-authored-by: Andrey Kashchikhin * reverted windows changes * reverted build runner * fixed review comments * minor review fixes * make help func static * renamed test runner * fixed merge issue * removed unused log * reduced command line * fixed issue fith conformance run * fixed typo * set testa as default split unit * fixed tasks queue with time -1 * fixed test result caculation * reverted wrong fix * reverted changes * set time limitation * reverted unused change * fix win command lines * reuse env variables in pipeline * fixed install files permissions * fixed pipeline syntax * reset validation schema * fixed env names * reverted initial setting of env * increased test runner * fixed pathes * reuse env path * reset validation schema * Revert "reuse env path" This reverts commit 97422ac5957dbe04ef7b65e5ef74e6bc6a541acd. * Revert "increased test runner" This reverts commit 010aa31641fca5562caafb4962ecd4ddadd8f582. * revert command line reduction * made if condition clearer --------- Co-authored-by: Andrey Kashchikhin --- .github/workflows/linux.yml | 29 +- .../functional_test_utils/CMakeLists.txt | 4 + .../layer_tests_summary/run_conformance.py | 26 +- .../layer_tests_summary/run_parallel.py | 350 ++++++++++-------- .../layer_tests_summary/utils/constants.py | 3 + 5 files changed, 245 insertions(+), 167 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 7fe5e42e216..3dc75adf3c9 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -749,10 +749,12 @@ jobs: defaults: run: shell: bash - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest-4-cores env: INSTALL_DIR: ${{ github.workspace }}/install INSTALL_TEST_DIR: ${{ github.workspace }}/install/tests + PARALLEL_TEST_SCRIPT: ${{ github.workspace }}/install/tests/functional_test_utils/run_parallel.py + PARALLEL_TEST_CACHE: ${{ github.workspace }}/install/tests/test_cache.lst steps: - name: Create Directories @@ -784,15 +786,34 @@ jobs: tar -xzf openvino_tests.tar.gz -C ${{ env.INSTALL_DIR }} && rm openvino_tests.tar.gz || exit 1 popd - - name: Intel CPU plugin func tests + - name: Install python dependencies + run: | + python3 -m pip install --upgrade pip + python3 -m pip install -r ${{ env.INSTALL_TEST_DIR }}/functional_test_utils/requirements.txt + + - name: Cache Tests Execution Time + id: tests-functional-cpu-cache + uses: actions/cache@v3 + with: + path: ${{ env.PARALLEL_TEST_CACHE }} + key: ${{ runner.os }}-tests-functional-cpu-cache + + - name: Intel CPU plugin func tests (parallel) run: | source ${{ env.INSTALL_DIR }}/setupvars.sh - ${{ env.INSTALL_TEST_DIR }}/ov_cpu_func_tests --gtest_print_time=1 --gtest_filter=*smoke* --gtest_output=xml:"${{ env.INSTALL_TEST_DIR }}/TEST-CPUFuncTests.xml" + python3 ${{ env.PARALLEL_TEST_SCRIPT }} -e ${{ env.INSTALL_TEST_DIR }}/ov_cpu_func_tests -c ${{ env.PARALLEL_TEST_CACHE }} -w ${{ env.INSTALL_TEST_DIR }} -s suite -rf 0 -- --gtest_print_time=1 --gtest_filter=*smoke* + timeout-minutes: 25 - name: Upload Test Results uses: actions/upload-artifact@v3 if: ${{ always() }} with: name: test-results-functional-cpu - path: ${{ env.INSTALL_TEST_DIR }}/TEST*.xml + path: | + ${{ env.INSTALL_TEST_DIR }}/TEST*.xml + ${{ env.INSTALL_TEST_DIR }}/logs/failed/*.log + ${{ env.INSTALL_TEST_DIR }}/logs/crashed/*.log + ${{ env.INSTALL_TEST_DIR }}/logs/hanged/*.log + ${{ env.INSTALL_TEST_DIR }}/logs/interapted/*.log + ${{ env.INSTALL_TEST_DIR }}/logs/disabled_tests.log if-no-files-found: 'error' diff --git a/src/tests/test_utils/functional_test_utils/CMakeLists.txt b/src/tests/test_utils/functional_test_utils/CMakeLists.txt index a164bbdd180..15b7bc0167f 100644 --- a/src/tests/test_utils/functional_test_utils/CMakeLists.txt +++ b/src/tests/test_utils/functional_test_utils/CMakeLists.txt @@ -29,6 +29,10 @@ addIeTarget( $ ) +install(PROGRAMS layer_tests_summary/run_parallel.py DESTINATION tests/functional_test_utils COMPONENT tests EXCLUDE_FROM_ALL) +install(FILES layer_tests_summary/requirements.txt DESTINATION tests/functional_test_utils COMPONENT tests EXCLUDE_FROM_ALL) +install(DIRECTORY layer_tests_summary/utils DESTINATION tests/functional_test_utils COMPONENT tests EXCLUDE_FROM_ALL) + ie_faster_build(${TARGET_NAME} PCH PRIVATE "src/precomp.hpp" ) diff --git a/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_conformance.py b/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_conformance.py index 96d641494a4..8c64a0eeb9d 100644 --- a/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_conformance.py +++ b/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_conformance.py @@ -141,7 +141,7 @@ class Conformance: logger.info(f"The file {download_path} is archieve. Should be unzip to {path_to_save}") return file_utils.unzip_archieve(download_path, path_to_save) return download_path - + def __dump_subgraph(self): subgraph_dumper_path = os.path.join(self._ov_path, f'{SUBGRAPH_DUMPER_BIN_NAME}{constants.OS_BIN_FILE_EXT}') @@ -196,7 +196,7 @@ class Conformance: if len(diff) > 0: logger.error(f"Unexpected failures: {diff}") exit(-1) - + intersection = self._expected_failures.intersection(this_run_failures) if this_run_failures != self._expected_failures and self._expected_failures_update: logger.info(f"Expected failures file {self._expected_failures} will be updated!!!") @@ -225,19 +225,21 @@ class Conformance: os.mkdir(report_dir) if not os.path.isdir(logs_dir): os.mkdir(logs_dir) - - command_line_args = [f"--device={self._device}", + + command_line_args = [f"--device={self._device}", f'--input_folders="{self._model_path}"' if self._type == constants.OP_CONFORMANCE else '', f"--report_unique_name", f'--output_folder="{parallel_report_dir}"', f'--gtest_filter=\"{self._gtest_filter}\"', f'--config_path="{self._ov_config_path}"', f'--shape_mode={self._special_mode}'] - conformance = TestParallelRunner(f"{conformance_path}", - command_line_args, - self._workers, - logs_dir, - self._cache_path, - self._is_parallel_over_devices, - self._expected_failures if not self._expected_failures_update else set()) + conformance = TestParallelRunner(exec_file_path=f"{conformance_path}", + test_command_line=command_line_args, + worker_num=self._workers, + working_dir=logs_dir, + cache_path=self._cache_path, + split_unit=constants.TEST_UNIT_NAME, + repeat_failed=1, + is_parallel_devices=self._is_parallel_over_devices, + excluded_tests=self._expected_failures if not self._expected_failures_update else set()) conformance.run() conformance.postprocess_logs() @@ -319,4 +321,4 @@ if __name__ == "__main__": args.parallel_devices, args.expected_failures, args.expected_failures_update) conformance.run(args.dump_graph) - + diff --git a/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_parallel.py b/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_parallel.py index e04b972eb0d..b0832867ea2 100644 --- a/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_parallel.py +++ b/src/tests/test_utils/functional_test_utils/layer_tests_summary/run_parallel.py @@ -21,6 +21,7 @@ import threading import csv import datetime import shlex +import heapq if sys.version_info.major >= 3: import _thread as thread @@ -38,6 +39,7 @@ except: FILENAME_LENGTH = 255 LOG_NAME_REPLACE_STR = "##NAME##" DEFAULT_PROCESS_TIMEOUT = 3600 +DEFAULT_SUITE_TIMEOUT = 3600 DEFAULT_TEST_TIMEOUT = 900 MAX_LENGHT = 4096 if not constants.IS_WIN else 8191 @@ -45,17 +47,22 @@ def parse_arguments(): parser = ArgumentParser() exec_file_path_help = "Path to the test executable file" cache_path_help = "Path to the cache file with test_name list sorted by execution time. .lst file!" - worker_num_help = "Worker number. Default value is `cpu_count-1` " + worker_num_help = "Worker number. Default value is `cpu_count` " working_dir_num_help = "Working dir" process_timeout_help = "Process timeout in s" parallel_help = "Parallel over HW devices. For example run tests over GPU.0, GPU.1 and etc" + split_unit_help = "Split by test or suite" + repeat_help = "Number of times to repeat failed and interrupted tests" parser.add_argument("-e", "--exec_file", help=exec_file_path_help, type=str, required=True) parser.add_argument("-c", "--cache_path", help=cache_path_help, type=str, required=False, default="") - parser.add_argument("-j", "--workers", help=worker_num_help, type=int, required=False, default=(os.cpu_count() - 1) if os.cpu_count() > 2 else 1) + parser.add_argument("-j", "--workers", help=worker_num_help, type=int, required=False, default=os.cpu_count()) parser.add_argument("-p", "--parallel_devices", help=parallel_help, type=int, required=False, default=0) parser.add_argument("-w", "--working_dir", help=working_dir_num_help, type=str, required=False, default=".") parser.add_argument("-t", "--process_timeout", help=process_timeout_help, type=int, required=False, default=DEFAULT_PROCESS_TIMEOUT) + parser.add_argument("-s", "--split_unit", help=split_unit_help, type=str, required=False, default=constants.TEST_UNIT_NAME) + parser.add_argument("-rf", "--repeat_failed", help=repeat_help, type=int, required=False, default=1) + return parser.parse_args() def get_test_command_line_args(): @@ -82,7 +89,7 @@ def get_device_by_args(args: list): break return device -# Class to read test cache +# Class to read test cache class TestStructure: _name = "" _time = 0 @@ -147,7 +154,7 @@ class TaskManager: # logger.warning(f"Impossible to kill process {pid} with error: {err}") pass - + def __find_free_process(self): while True: for pid in range(len(self._process_list)): @@ -184,8 +191,8 @@ class TaskManager: self._timers[pid] = datetime.datetime.now() self._idx += 1 return True - - def compelete_all_processes(self): + + def compelete_all_processes(self): while len(self._process_list) > 0: for pid in range(len(self._process_list)): try: @@ -206,7 +213,7 @@ class TaskManager: class TestParallelRunner: def __init__(self, exec_file_path: os.path, test_command_line: list, worker_num: int, working_dir: os.path, cache_path: os.path, - is_parallel_devices=False, excluded_tests=set()): + split_unit: str, repeat_failed: int, is_parallel_devices=False, excluded_tests=set()): self._exec_file_path = exec_file_path self._working_dir = working_dir self._conformance_ir_filelists = list() @@ -221,17 +228,19 @@ class TestParallelRunner: if not os.path.exists(head): os.mkdir(head) self._is_save_cache = True + if split_unit in constants.UNIT_NAMES: + self._split_unit = split_unit + else: + logger.error(f"Incorrect split_unit argument: {split_unit}. Please use the following values: {','.join(constants.UNIT_NAMES)}") + sys.exit(-1) + self._repeat_failed = repeat_failed self._disabled_tests = list() self._total_test_cnt = 0 self._device = get_device_by_args(self._command.split()) self._available_devices = [self._device] if not self._device is None else [] if has_python_api and is_parallel_devices: self._available_devices = get_available_devices(self._device) - self._excluded_tests_re = set() - self._orig_excluded_tests = excluded_tests - for test in excluded_tests: - self._excluded_tests_re.add(f'"{self.__replace_restricted_symbols(test)}":') - + self._excluded_tests = excluded_tests def __init_basic_command_line_for_exec_file(self, test_command_line: list): command = f'{self._exec_file_path}' @@ -241,6 +250,8 @@ class TestParallelRunner: is_input_folder = True command += f" --input_folders=" argument = argument[argument.find("=")+1:] + elif "--gtest_filter" in argument: + self._gtest_filter = argument[argument.find("=")+1:] if is_input_folder and argument[0] != "-": buf = "" for _ in argument.split(','): @@ -250,13 +261,33 @@ class TestParallelRunner: buf = file_utils.prepare_filelist(input_path, ["*.xml"]) self._conformance_ir_filelists.append(buf) buf += "," - argument = buf + argument = buf else: is_input_folder = False command += f" " command += f"{argument}" return command + @staticmethod + def __get_suite_filter(test_filter: str, suite_filter: str): + filters = test_filter.split(':') + suite_filter_mixed = '' + for filter in filters: + patterns = filter.strip('\"').split('*') + suite_filter = f'{suite_filter}*' + suite_filter_part = suite_filter + for pattern in patterns: + if pattern and suite_filter.find(pattern) == -1: + suite_filter_part += f'{pattern}*' + if suite_filter_part == suite_filter: + suite_filter_mixed = f'"{suite_filter_part}"' + break + if not suite_filter_mixed: + suite_filter_mixed = f'"{suite_filter_part}"' + else: + suite_filter_mixed += f':"{suite_filter_part}"' + return suite_filter_mixed + @staticmethod def __replace_restricted_symbols(input_string:str): restricted_symbols = "!@$%^&-+`~:;\",<>?" @@ -264,7 +295,9 @@ class TestParallelRunner: input_string = input_string.replace(symbol, '*') return input_string - def __get_test_list_by_runtime(self): + def __get_test_list_by_runtime(self, test_unit = constants.TEST_UNIT_NAME): + self._total_test_cnt = 0 + self._disabled_tests.clear() test_list_file_name = os.path.join(self._working_dir, "test_list.lst") if os.path.isfile(test_list_file_name): try: @@ -282,156 +315,147 @@ class TestParallelRunner: logger.error(f"The test list file does not exists! Please check the process output!") exit(-1) - test_list = list() + tests_dict = dict() with open(test_list_file_name) as test_list_file: test_suite = "" for test_name in test_list_file.read().split('\n'): if "Running main() from" in test_name: continue if not ' ' in test_name: - test_suite = test_name + test_suite = test_name.replace(".", "") continue - pos = test_name.find('#') + pos = test_name.find(' # ') if pos > 0 or test_suite != "": - real_test_name = test_suite + (test_name[2:pos-2] if pos > 0 else test_name[2:]) + real_test_name = test_suite + "." + (test_name[2:pos-1] if pos > 0 else test_name[2:]) if constants.DISABLED_PREFIX in real_test_name: self._disabled_tests.append(real_test_name) - else: - test_list.append(f'"{self.__replace_restricted_symbols(real_test_name)}":') + elif test_unit == constants.TEST_UNIT_NAME: + tests_dict[real_test_name] = 1 + self._total_test_cnt += 1 + elif test_unit == constants.SUITE_UNIT_NAME: + tests_dict[test_suite] = tests_dict.get(test_suite, 0) + 1 + self._total_test_cnt += 1 test_list_file.close() os.remove(test_list_file_name) - logger.info(f"Len test_list_runtime (without disabled tests): {len(test_list)}") - if len(test_list) == 0: + logger.info(f"Len test_list_runtime (without disabled tests): {len(tests_dict)}") + if len(tests_dict) == 0: logger.warning(f"Look like there are not tests to run! Please check the filters!") exit(0) - return test_list + return tests_dict def __get_test_list_by_cache(self): - test_list_cache = list() + tests_dict_cache = dict() if os.path.isfile(self._cache_path): logger.info(f"Get test list from cache file: {self._cache_path}") with open(self._cache_path, "r") as cache_file: for line in cache_file.readlines(): pos = line.find(":") - time = line[:pos] - test_name = line[pos+1:] - if not constants.DISABLED_PREFIX in test_name: - test_list_cache.append(TestStructure(test_name.replace("\n", ""), time)) - logger.info(f"Len test_list_cache: {len(test_list_cache)}") - return test_list_cache + time = int(line[:pos]) + test_name = line[pos+1:].replace("\n", "") + test_suite = test_name[:test_name.find(".")] - def __generate_test_lists(self, test_list_cache: list, test_list_runtime:list): - cached_test_list = list() - runtime_test_test = list() - cached_test_list_names = list() + if self._split_unit == constants.TEST_UNIT_NAME: + if constants.DISABLED_PREFIX not in test_name: + if (time != -1): + tests_dict_cache[test_name] = tests_dict_cache.get(test_name, 0) + time + elif self._split_unit == constants.SUITE_UNIT_NAME: + if constants.DISABLED_PREFIX not in test_suite: + if (time == -1): + tests_dict_cache[test_suite] = tests_dict_cache.get(test_suite, -1) + else: + tests_dict_cache[test_suite] = tests_dict_cache.get(test_suite, 0) + time - for test in test_list_cache: - if test._name in test_list_runtime and not test in self._excluded_tests_re: - cached_test_list.append(test) - cached_test_list_names.append(test._name) - for test in test_list_runtime: - if not test in cached_test_list_names and not test in self._excluded_tests_re: - runtime_test_test.append(test) + logger.info(f"Len tests_dict_cache: {len(tests_dict_cache)}") + return tests_dict_cache - if len(runtime_test_test) > 0: + def __generate_test_lists(self, test_dict_cache: dict, test_dict_runtime: dict): + cached_test_dict = dict() + runtime_test_dict = dict() + + for test in test_dict_cache: + if test in test_dict_runtime and test not in self._excluded_tests: + cached_test_dict[test] = test_dict_cache[test] + + for test in test_dict_runtime: + if test not in cached_test_dict and test not in self._excluded_tests: + runtime_test_dict[test] = test_dict_runtime[test] + + if len(runtime_test_dict) > 0: logger.warning(f'Cache file is not relevant the run. The will works in hybrid mode.') - logger.info(f'Test count from cache: {len(cached_test_list)}') - logger.info(f'Test count from runtime: {len(runtime_test_test)}') - return cached_test_list, runtime_test_test + logger.info(f'{self._split_unit.title()} count from cache: {len(cached_test_dict)}') + logger.info(f'{self._split_unit.title()} count from runtime: {len(runtime_test_dict)}') + return cached_test_dict, runtime_test_dict - def __prepare_smart_filters(self, proved_test_list:list): - res_test_filters = list() + def __prepare_smart_filters(self, proved_test_dict: dict): def_length = len(self._command) + len(" --gtest_filter=") - idx = len(proved_test_list) - for i in range(len(proved_test_list)): - if proved_test_list[i]._time == -1: - idx = i - break - - # Run crashed tests in a separed thread - if idx < len(proved_test_list): - while proved_test_list[idx]._time == -1: - test = proved_test_list.pop(idx) - res_test_filters.append(test._name) - if idx >= len(proved_test_list): - break + if constants.IS_WIN: + # subprocess add cmd.exe to the command line on Windows if shell=True + def_length += len(f'{os.environ.get("COMSPEC", "cmd.exe")} /C ') longest_device = "" for device in self._available_devices: if len(device) > len(longest_device): longest_device = device - # prepare gtest filters per worker according command line length limitation - while len(proved_test_list) > 0: - test_times = [] - is_not_full = True - worker_test_filters = list() + real_worker_num = self._worker_num * len(self._available_devices) - real_worker_num = self._worker_num * len(self._available_devices) + tasks_crashed = [] + tasks_full = [] + tasks_not_full = [] + tests_sorted = sorted(proved_test_dict.items(), key=lambda i: i[1], reverse=True) + for test_pattern, test_time in tests_sorted: + test_pattern = f'{self.__replace_restricted_symbols(test_pattern)}' - for _ in range(real_worker_num): - if len(proved_test_list) == 0: - break - worker_test_filters.append(f'"{self.__replace_restricted_symbols(proved_test_list[0]._name)}":') - test = proved_test_list.pop(0) - test_times.append(test._time) - while is_not_full and len(proved_test_list) > 0: - for i in range(real_worker_num): - if i >= len(proved_test_list): + if self._split_unit == constants.SUITE_UNIT_NAME: + # fix the suite filters to execute the right amount of the tests + test_pattern = f'{self.__get_suite_filter(self._gtest_filter, test_pattern)}:' + else: + # add quotes and pattern splitter + test_pattern = f'"{test_pattern}":' + + if test_time == -1: + tasks_crashed.append((test_time, test_pattern)) + else: + while len(tasks_not_full) > 0: + t_time, t_pattern = tasks_not_full[0] + length = len(t_pattern) + def_length + len(test_pattern.replace(self._device, longest_device)) + if length < MAX_LENGHT: break - if i == 0: - continue - while test_times[0] > test_times[i] + proved_test_list[len(proved_test_list) - 1]._time: - final_pos = len(proved_test_list) - 1 - filter = proved_test_list[final_pos]._name - if len(worker_test_filters[i]) + def_length + len(filter.replace(self._device, longest_device)) < MAX_LENGHT: - worker_test_filters[i] += f'"{self.__replace_restricted_symbols(filter)}":' - test_times[i] += proved_test_list[final_pos]._time - proved_test_list.pop(final_pos) - else: - is_not_full = False - break - if len(proved_test_list) == 0: - break - if is_not_full and len(proved_test_list) > 0: - filter = proved_test_list[0]._name - if len(worker_test_filters[0]) + def_length + len(filter.replace(self._device, longest_device)) < MAX_LENGHT: - worker_test_filters[0] += f'"{self.__replace_restricted_symbols(filter)}":' - test_times[0] += proved_test_list[0]._time - proved_test_list.pop(0) else: - is_not_full = False - for filter in worker_test_filters: - res_test_filters.append(filter) - is_not_full = True - # logging for debug - for i in range(len(res_test_filters)): - filter = res_test_filters[i] - cnt = filter.count('\":') - self._total_test_cnt += cnt - # logger.info(f"Number of tests in job_{i}: {cnt}") - return res_test_filters - + tasks_full.append(tasks_not_full.pop()) + + if len(tasks_not_full) < real_worker_num: + heapq.heappush(tasks_not_full, (test_time, test_pattern)) + else: + heapq.heapreplace(tasks_not_full, (t_time + test_time, t_pattern + test_pattern)) + + test_filters = tasks_full + tasks_not_full + tasks_crashed + test_filters.sort(reverse=True) + # convert to list and exlude empty jobs + test_filters = [task[1] for task in test_filters if task[1]] + return test_filters + def __get_filters(self): if not os.path.isfile(self._exec_file_path): logger.error(f"Test executable file {self._exec_file_path} is not exist!") sys.exit(-1) - - test_list_runtime = self.__get_test_list_by_runtime() - test_list_cache = self.__get_test_list_by_cache() - - cached_test_list, runtime_test_list = self.__generate_test_lists(test_list_cache, test_list_runtime) - if len(cached_test_list) > 0: + test_dict_runtime = self.__get_test_list_by_runtime(self._split_unit) + test_dict_cache = self.__get_test_list_by_cache() + + cached_test_dict, runtime_test_dist = self.__generate_test_lists(test_dict_cache, test_dict_runtime) + + cached_test_list = list() + if len(cached_test_dict) > 0: self._is_save_cache = False - cached_test_list = self.__prepare_smart_filters(cached_test_list) - if len(runtime_test_list) > 0: + cached_test_list = self.__prepare_smart_filters(cached_test_dict) + runtime_test_list = list() + if len(runtime_test_dist) > 0: self._is_save_cache = True - self._total_test_cnt += len(runtime_test_list) - runtime_test_list.reverse() + runtime_test_list = self.__prepare_smart_filters(runtime_test_dist) logger.info(f"Total test counter is {self._total_test_cnt}") return cached_test_list, runtime_test_list - + def __execute_tests(self, filters: list(), prev_worker_cnt = 0): commands = [f'{self._command} --gtest_filter={filter}' for filter in filters] tmp_log_dir = os.path.join(self._working_dir, "temp") @@ -463,7 +487,7 @@ class TestParallelRunner: test_name = line[line.find(constants.RUN) + len(constants.RUN) + 1:-1:] has_status = False if test_name is not None: - test_names.add(f'"{self.__replace_restricted_symbols(test_name)}":') + test_names.add(test_name) for _, status_messages in constants.TEST_STATUS.items(): for status_msg in status_messages: if status_msg in line: @@ -471,12 +495,12 @@ class TestParallelRunner: break if has_status: break - if not has_status: - interapted_tests.append(f'"{test_name}":') + if not has_status and test_name: + interapted_tests.append(test_name) log_file.close() test_list_runtime = set(self.__get_test_list_by_runtime()) - not_runned_tests = test_list_runtime.difference(test_names).difference(self._excluded_tests_re) - interapted_tests = set(interapted_tests).difference(self._excluded_tests_re) + not_runned_tests = test_list_runtime.difference(test_names).difference(self._excluded_tests) + interapted_tests = set(interapted_tests).difference(self._excluded_tests) return list(not_runned_tests), list(interapted_tests) def run(self): @@ -486,26 +510,29 @@ class TestParallelRunner: if len(self._available_devices) > 1: logger.info(f"Tests will be run over devices: {self._available_devices} instead of {self._device}") t_start = datetime.datetime.now() - + filters_cache, filters_runtime = self.__get_filters() + # it is better to reuse workes for both cached and runtime tasks + test_filters = filters_cache + filters_runtime worker_cnt = 0 - if len(filters_cache): - logger.info(f"Execute jobs taken from cache") - worker_cnt += self.__execute_tests(filters_cache, worker_cnt) + if len(test_filters): + logger.info(f"Execute jobs taken from cache and runtime") + worker_cnt += self.__execute_tests(test_filters, worker_cnt) # 15m for one test in one process if TaskManager.process_timeout == -1 or TaskManager.process_timeout == DEFAULT_PROCESS_TIMEOUT: - TaskManager.process_timeout = DEFAULT_TEST_TIMEOUT - if len(filters_runtime): - logger.info(f"Execute jobs taken from runtime") - worker_cnt += self.__execute_tests(filters_runtime, worker_cnt) - not_runned_test_filter, interapted_tests = self.__find_not_runned_tests() - if len(not_runned_test_filter) > 0: - logger.info(f"Execute not runned {len(not_runned_test_filter)} tests") - worker_cnt += self.__execute_tests(not_runned_test_filter, worker_cnt) - if len(interapted_tests) > 0: - logger.info(f"Execute interapted {len(interapted_tests)} tests") - worker_cnt += self.__execute_tests(interapted_tests, worker_cnt) + TaskManager.process_timeout = DEFAULT_SUITE_TIMEOUT if self._split_unit == constants.SUITE_UNIT_NAME else DEFAULT_TEST_TIMEOUT + + not_runned_tests, interapted_tests = self.__find_not_runned_tests() + if (self._repeat_failed > 0): + if len(not_runned_tests) > 0: + logger.info(f"Execute not runned {len(not_runned_tests)} tests") + not_runned_test_filters = [f'"{self.__replace_restricted_symbols(test)}"' for test in not_runned_tests] + worker_cnt += self.__execute_tests(not_runned_test_filters, worker_cnt) + if len(interapted_tests) > 0: + logger.info(f"Execute interapted {len(interapted_tests)} tests") + interapted_tests_filters = [f'"{self.__replace_restricted_symbols(test)}"' for test in interapted_tests] + worker_cnt += self.__execute_tests(interapted_tests_filters, worker_cnt) t_end = datetime.datetime.now() total_seconds = (t_end - t_start).total_seconds() @@ -547,7 +574,7 @@ class TestParallelRunner: with open(test_log_filename, "w") as log: log.writelines(test_log) log.close() - saved_tests.append(f'\"{test_name}\":') + saved_tests.append(test_name) return True logs_dir = os.path.join(self._working_dir, "logs") @@ -566,6 +593,7 @@ class TestParallelRunner: with open(log_filename, "r") as log_file: test_name = None test_log = list() + test_suites = set() dir = None test_cnt_expected = test_cnt_real_saved_now = 0 ref_k = None @@ -580,6 +608,7 @@ class TestParallelRunner: test_cnt_expected = line.count(':') if constants.RUN in line: test_name = line[line.find(constants.RUN) + len(constants.RUN) + 1:-1:] + dir = None if self._device != None and self._available_devices != None: for device_name in self._available_devices: if device_name in test_name: @@ -599,6 +628,8 @@ class TestParallelRunner: if (constants.PG_ERR in line) or (constants.PG_WARN in line): test_log.append(line) if test_name is not None: + test_suite = test_name[:test_name.find(".")] + test_suites.add(test_suite) test_log.append(line) if dir: if __save_log(logs_dir, dir, test_name): @@ -617,17 +648,22 @@ class TestParallelRunner: test_cnt_real_saved_now += 1 test_name = None test_log = list() - dir = None log_file.close() if test_name != None: dir = INTERAPTED_DIR if __save_log(logs_dir, dir, test_name): interapted_tests.add(test_name) - test_cnt_real = test_cnt_real_saved_now + + if (self._split_unit == constants.SUITE_UNIT_NAME): + test_cnt_real = len(test_suites) + else: + test_cnt_real = test_cnt_real_saved_now + if test_cnt_real < test_cnt_expected: - logger.error(f"Number of tests in {log}: {test_cnt_real}. Expected is {test_cnt_expected} tests") + logger.error(f"Number of {self._split_unit}s in {log}: {test_cnt_real}. Expected is {test_cnt_expected} {self._split_unit}") else: os.remove(log_filename) + if len(list(Path(os.path.join(self._working_dir, "temp")).rglob("log_*.log"))) == 0: rmtree(os.path.join(self._working_dir, "temp")) for test_name in interapted_tests: @@ -645,7 +681,7 @@ class TestParallelRunner: if self._is_save_cache: test_times.sort(reverse=True) with open(self._cache_path, "w") as cache_file: - cache_file.writelines([f"{time}:\"" + test_name + "\":\n" for time, test_name in test_times]) + cache_file.writelines([f"{time}:{test_name}\n" for time, test_name in test_times]) cache_file.close() logger.info(f"Test cache test is saved to: {self._cache_path}") hash_table_path = os.path.join(logs_dir, "hash_table.csv") @@ -671,7 +707,7 @@ class TestParallelRunner: _, tail = os.path.split(ir_hash) ir_hash, _ = os.path.splitext(tail) ir_hashes.append(ir_hash) - + logger.info(f"Fix priorities list is saved to: {fix_priority_path}") # Find all irs for failed tests failed_ir_dir = os.path.join(self._working_dir, f'{self._device}_failed_ir') @@ -689,7 +725,7 @@ class TestParallelRunner: xml_file = correct_ir bin_file = prefix + constants.BIN_EXTENSION meta_file = prefix + constants.META_EXTENSION - + failed_ir_xml = xml_file.replace(head, failed_ir_dir) failed_ir_bin = bin_file.replace(head, failed_ir_dir) failed_ir_meta = meta_file.replace(head, failed_ir_dir) @@ -731,16 +767,16 @@ class TestParallelRunner: not_run_tests_path = os.path.join(logs_dir, "not_run_tests.log") with open(not_run_tests_path, "w") as not_run_tests_path_file: test_list_runtime = self.__get_test_list_by_runtime() - diff_set = set(test_list_runtime).difference(set(saved_tests)).difference(self._orig_excluded_tests) + diff_set = set(saved_tests).intersection(test_list_runtime).difference(set(saved_tests)).difference(self._excluded_tests) diff_list = list() for item in diff_set: diff_list.append(f"{item}\n") not_run_tests_path_file.writelines(diff_list) not_run_tests_path_file.close() - logger.info(f"Not run test list is saved to: {not_run_tests_path}") l = len(diff_list) if l > 0: logger.warning(f"Not run test test counter is: {len(diff_list)}") + logger.info(f"Not run test list is saved to: {not_run_tests_path}") is_successfull_run = True test_cnt = 0 @@ -751,8 +787,11 @@ class TestParallelRunner: is_successfull_run = False if len(self._disabled_tests): logger.info(f"disabled test counter is: {len(self._disabled_tests)}") - if self._total_test_cnt != test_cnt: + + diff_set = set(saved_tests).difference(set(test_list_runtime)) + if diff_set: logger.error(f"Total test count is {test_cnt} is different with expected {self._total_test_cnt} tests") + [logger.error(f'Missed test: {test}') for test in diff_set] is_successfull_run = False logger.info(f"Total test count with disabled tests is {test_cnt + len(self._disabled_tests)}. All logs is saved to {logs_dir}") return is_successfull_run @@ -766,11 +805,20 @@ if __name__ == "__main__": logger.info(f"[ARGUMENTS] --cache_path={args.cache_path}") logger.info(f"[ARGUMENTS] --workers={args.workers}") logger.info(f"[ARGUMENTS] --parallel_devices={args.parallel_devices}") + logger.info(f"[ARGUMENTS] --split_unit={args.split_unit}") + logger.info(f"[ARGUMENTS] --repeat_failed={args.repeat_failed}") logger.info(f"[ARGUMENTS] Executable file arguments = {exec_file_args}") TaskManager.process_timeout = args.process_timeout - conformance = TestParallelRunner(args.exec_file, exec_file_args, args.workers, args.working_dir, args.cache_path, args.parallel_devices) - conformance.run() - if not conformance.postprocess_logs(): + test_runner = TestParallelRunner(exec_file_path = args.exec_file, + test_command_line = exec_file_args, + worker_num = args.workers, + working_dir = args.working_dir, + cache_path = args.cache_path, + split_unit = args.split_unit, + repeat_failed = args.repeat_failed, + is_parallel_devices = args.parallel_devices) + test_runner.run() + if not test_runner.postprocess_logs(): logger.error("Run is not successful") sys.exit(-1) else: diff --git a/src/tests/test_utils/functional_test_utils/layer_tests_summary/utils/constants.py b/src/tests/test_utils/functional_test_utils/layer_tests_summary/utils/constants.py index 825e599d52e..906229228cc 100644 --- a/src/tests/test_utils/functional_test_utils/layer_tests_summary/utils/constants.py +++ b/src/tests/test_utils/functional_test_utils/layer_tests_summary/utils/constants.py @@ -16,6 +16,9 @@ DISABLED_PREFIX = "DISABLED_" PG_ERR = "PG ERROR" PG_WARN = "PG WARN" REF_COEF = "[ CONFORMANCE ] Influence coefficient: " +TEST_UNIT_NAME = "test" +SUITE_UNIT_NAME = "suite" +UNIT_NAMES = [TEST_UNIT_NAME, SUITE_UNIT_NAME] IS_WIN = "windows" in platform or "win32" in platform IS_MACOS = "darwin" in platform