From 4e8590bf9b9b406206cc5ac4cb5a1466602471ff Mon Sep 17 00:00:00 2001 From: Irina Efode Date: Fri, 3 Mar 2023 12:18:53 +0400 Subject: [PATCH] Add correct handling of conformance processes (#16031) * Update run_parallel.py * Add correct handling of conformance processes * remove extra * Update run_parallel.py --- .../layer_tests_summary/run_parallel.py | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py index 83ca732e287..fe8bd1d9f05 100644 --- a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py @@ -16,6 +16,7 @@ import threading import platform import csv import datetime +import shlex if sys.version_info.major >= 3: import _thread as thread @@ -25,6 +26,7 @@ else: FILENAME_LENGTH = 255 LOG_NAME_REPLACE_STR = "##NAME##" DEFAULT_PROCESS_TIMEOUT = 3600 +DEFAULT_TEST_TIMEOUT = 900 MAX_LENGHT = 4096 if platform.system() != "Windows" else 8191 logger = get_logger('test_parallel_runner') @@ -86,8 +88,9 @@ class TaskManager: return log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length)) with open(log_file_name, "w") as log_file: + args = shlex.split(self._command_list[self._idx]) worker = self.__create_thread( - self._process_list.append(Popen(self._command_list[self._idx], shell=True, stdout=log_file, stderr=log_file))) + self._process_list.append(Popen(args, stdout=log_file, stderr=log_file))) self._workers.append(worker) worker.join() self._timers.append(datetime.datetime.now()) @@ -109,7 +112,8 @@ class TaskManager: continue def __update_process(self, pid:int, log_file): - self._process_list[pid] = Popen(self._command_list[self._idx], shell=True, stdout=log_file, stderr=log_file) + args = shlex.split(self._command_list[self._idx]) + self._process_list[pid] = Popen(args, stdout=log_file, stderr=log_file) def update_worker(self): if self._idx >= len(self._command_list): @@ -130,6 +134,7 @@ class TaskManager: if float((datetime.datetime.now() - self._timers[pid]).total_seconds()) > self.process_timeout: logger.warning(f"Process {pid} exceed time limetation per process. The process will be killed") self._process_list[pid].kill() + self._process_list[pid].wait(timeout=1) self._process_list[pid].wait(timeout=0) # logger.info(f"Process {pid} takes {float((datetime.datetime.now() - self._timers[pid]).total_seconds())}") self._process_list.pop(pid) @@ -165,7 +170,7 @@ class TestParallelRunner: for argument in test_command_line: if "--input_folders" in argument: is_input_folder = True - command += f" --input_folders" + command += f" --input_folders=" argument = argument[argument.find("=")+1:] if is_input_folder and argument[0] != "-": buf = "" @@ -178,7 +183,8 @@ class TestParallelRunner: argument = buf else: is_input_folder = False - command += f" {argument}" + command += f" " + command += f"{argument}" return command @staticmethod @@ -292,6 +298,8 @@ class TestParallelRunner: else: is_not_full = False break + if len(proved_test_list) == 0: + break if is_not_full and len(proved_test_list) > 0: worker_test_filters[0] += f'"{self.__replace_restricted_symbols(proved_test_list[0]._name)}":' test_times[0] += proved_test_list[0]._time @@ -350,6 +358,9 @@ class TestParallelRunner: if len(filters_runtime): logger.info(f"Execute jobs taken from runtime") worker_cnt = self.__execute_tests(filters_runtime, worker_cnt) + # 15m for one test in one process + if TaskManager.process_timeout == -1 or TaskManager.process_timeout == DEFAULT_PROCESS_TIMEOUT: + TaskManager.process_timeout = DEFAULT_TEST_TIMEOUT if len(filters_cache): logger.info(f"Execute jobs taken from cache") self.__execute_tests(filters_cache, worker_cnt) @@ -366,6 +377,7 @@ class TestParallelRunner: def postprocess_logs(self): test_results = dict() logger.info(f"Log analize is started") + saved_tests = list() def __save_log(logs_dir, dir, test_name): test_log_filename = os.path.join(logs_dir, dir, f"{test_name}.txt".replace('/', '_')) hash_str = str(sha256(test_name.encode('utf-8')).hexdigest()) @@ -381,6 +393,7 @@ class TestParallelRunner: with open(test_log_filename, "w") as log: log.writelines(test_log) log.close() + saved_tests.append(f'\"{test_name}\":') return True logs_dir = os.path.join(self._working_dir, "logs") @@ -400,8 +413,8 @@ class TestParallelRunner: test_name = None test_log = list() dir = None + test_cnt_expected = test_cnt_real_saved_now = 0 ref_k = None - test_cnt_expected = test_cnt_real_saved_now = test_cnt_real_saved_before = 0 try: lines = log_file.readlines() except: @@ -427,7 +440,7 @@ class TestParallelRunner: test_log.append(line) if dir: if __save_log(logs_dir, dir, test_name): - # update test_cache with tests. If tests is crashed use -2 as unknown time + # update test_cache with tests. If tests is crashed use -1 as unknown time time = -1 if "ms)" in line: time = line[line.rfind("(") + 1:line.rfind("ms)") - 1] @@ -443,10 +456,19 @@ class TestParallelRunner: test_name = None test_log = list() dir = None - else: - test_cnt_real_saved_before += 1 log_file.close() - test_cnt_real = test_cnt_real_saved_before + test_cnt_real_saved_now + if test_name != None: + dir = 'interapted' + if __save_log(logs_dir, dir, test_name): + # update test_cache with tests. If tests is crashed use -1 as unknown time + time = -1 + test_times.append((int(time), test_name)) + if dir in test_results.keys(): + test_results[dir] += 1 + else: + test_results[dir] = 1 + test_cnt_real_saved_now += 1 + test_cnt_real = test_cnt_real_saved_now if test_cnt_real < test_cnt_expected: logger.error(f"Number of tests in {log}: {test_cnt_real}. Expected is {test_cnt_expected} tests") else: @@ -476,7 +498,7 @@ class TestParallelRunner: logger.info(f"Fix priorities list is saved to: {fix_priority_path}") - disabled_tests_path = os.path.join(logs_dir, "disabled_tests.lst") + disabled_tests_path = os.path.join(logs_dir, "disabled_tests.log") with open(disabled_tests_path, "w") as disabled_tests_file: for i in range(len(self._disabled_tests)): self._disabled_tests[i] += "\n" @@ -484,6 +506,20 @@ class TestParallelRunner: disabled_tests_file.close() logger.info(f"Disabled test list is saved to: {disabled_tests_path}") + not_run_tests_path = os.path.join(logs_dir, "not_run_tests.log") + with open(not_run_tests_path, "w") as not_run_tests_path_file: + test_list_runtime = self.__get_test_list_by_runtime() + diff_set = set(test_list_runtime).difference(set(saved_tests)) + diff_list = list() + for item in diff_set: + diff_list.append(f"{item}\n") + not_run_tests_path_file.writelines(diff_list) + not_run_tests_path_file.close() + logger.info(f"Not run test list is saved to: {not_run_tests_path}") + l = len(diff_list) + if l > 0: + logger.warning(f"Not run test test counter is: {len(diff_list)}") + is_successfull_run = True test_cnt = 0 for test_st, test_res in test_results.items():