Add correct handling of conformance processes (#16031)

* Update run_parallel.py

* Add correct handling of conformance processes

* remove extra

* Update run_parallel.py
This commit is contained in:
Irina Efode 2023-03-03 12:18:53 +04:00 committed by GitHub
parent 7deb9090bf
commit 4e8590bf9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -16,6 +16,7 @@ import threading
import platform
import csv
import datetime
import shlex
if sys.version_info.major >= 3:
import _thread as thread
@ -25,6 +26,7 @@ else:
FILENAME_LENGTH = 255
LOG_NAME_REPLACE_STR = "##NAME##"
DEFAULT_PROCESS_TIMEOUT = 3600
DEFAULT_TEST_TIMEOUT = 900
MAX_LENGHT = 4096 if platform.system() != "Windows" else 8191
logger = get_logger('test_parallel_runner')
@ -86,8 +88,9 @@ class TaskManager:
return
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length))
with open(log_file_name, "w") as log_file:
args = shlex.split(self._command_list[self._idx])
worker = self.__create_thread(
self._process_list.append(Popen(self._command_list[self._idx], shell=True, stdout=log_file, stderr=log_file)))
self._process_list.append(Popen(args, stdout=log_file, stderr=log_file)))
self._workers.append(worker)
worker.join()
self._timers.append(datetime.datetime.now())
@ -109,7 +112,8 @@ class TaskManager:
continue
def __update_process(self, pid:int, log_file):
self._process_list[pid] = Popen(self._command_list[self._idx], shell=True, stdout=log_file, stderr=log_file)
args = shlex.split(self._command_list[self._idx])
self._process_list[pid] = Popen(args, stdout=log_file, stderr=log_file)
def update_worker(self):
if self._idx >= len(self._command_list):
@ -130,6 +134,7 @@ class TaskManager:
if float((datetime.datetime.now() - self._timers[pid]).total_seconds()) > self.process_timeout:
logger.warning(f"Process {pid} exceed time limetation per process. The process will be killed")
self._process_list[pid].kill()
self._process_list[pid].wait(timeout=1)
self._process_list[pid].wait(timeout=0)
# logger.info(f"Process {pid} takes {float((datetime.datetime.now() - self._timers[pid]).total_seconds())}")
self._process_list.pop(pid)
@ -165,7 +170,7 @@ class TestParallelRunner:
for argument in test_command_line:
if "--input_folders" in argument:
is_input_folder = True
command += f" --input_folders"
command += f" --input_folders="
argument = argument[argument.find("=")+1:]
if is_input_folder and argument[0] != "-":
buf = ""
@ -178,7 +183,8 @@ class TestParallelRunner:
argument = buf
else:
is_input_folder = False
command += f" {argument}"
command += f" "
command += f"{argument}"
return command
@staticmethod
@ -292,6 +298,8 @@ class TestParallelRunner:
else:
is_not_full = False
break
if len(proved_test_list) == 0:
break
if is_not_full and len(proved_test_list) > 0:
worker_test_filters[0] += f'"{self.__replace_restricted_symbols(proved_test_list[0]._name)}":'
test_times[0] += proved_test_list[0]._time
@ -350,6 +358,9 @@ class TestParallelRunner:
if len(filters_runtime):
logger.info(f"Execute jobs taken from runtime")
worker_cnt = self.__execute_tests(filters_runtime, worker_cnt)
# 15m for one test in one process
if TaskManager.process_timeout == -1 or TaskManager.process_timeout == DEFAULT_PROCESS_TIMEOUT:
TaskManager.process_timeout = DEFAULT_TEST_TIMEOUT
if len(filters_cache):
logger.info(f"Execute jobs taken from cache")
self.__execute_tests(filters_cache, worker_cnt)
@ -366,6 +377,7 @@ class TestParallelRunner:
def postprocess_logs(self):
test_results = dict()
logger.info(f"Log analize is started")
saved_tests = list()
def __save_log(logs_dir, dir, test_name):
test_log_filename = os.path.join(logs_dir, dir, f"{test_name}.txt".replace('/', '_'))
hash_str = str(sha256(test_name.encode('utf-8')).hexdigest())
@ -381,6 +393,7 @@ class TestParallelRunner:
with open(test_log_filename, "w") as log:
log.writelines(test_log)
log.close()
saved_tests.append(f'\"{test_name}\":')
return True
logs_dir = os.path.join(self._working_dir, "logs")
@ -400,8 +413,8 @@ class TestParallelRunner:
test_name = None
test_log = list()
dir = None
test_cnt_expected = test_cnt_real_saved_now = 0
ref_k = None
test_cnt_expected = test_cnt_real_saved_now = test_cnt_real_saved_before = 0
try:
lines = log_file.readlines()
except:
@ -427,7 +440,7 @@ class TestParallelRunner:
test_log.append(line)
if dir:
if __save_log(logs_dir, dir, test_name):
# update test_cache with tests. If tests is crashed use -2 as unknown time
# update test_cache with tests. If tests is crashed use -1 as unknown time
time = -1
if "ms)" in line:
time = line[line.rfind("(") + 1:line.rfind("ms)") - 1]
@ -443,10 +456,19 @@ class TestParallelRunner:
test_name = None
test_log = list()
dir = None
else:
test_cnt_real_saved_before += 1
log_file.close()
test_cnt_real = test_cnt_real_saved_before + test_cnt_real_saved_now
if test_name != None:
dir = 'interapted'
if __save_log(logs_dir, dir, test_name):
# update test_cache with tests. If tests is crashed use -1 as unknown time
time = -1
test_times.append((int(time), test_name))
if dir in test_results.keys():
test_results[dir] += 1
else:
test_results[dir] = 1
test_cnt_real_saved_now += 1
test_cnt_real = test_cnt_real_saved_now
if test_cnt_real < test_cnt_expected:
logger.error(f"Number of tests in {log}: {test_cnt_real}. Expected is {test_cnt_expected} tests")
else:
@ -476,7 +498,7 @@ class TestParallelRunner:
logger.info(f"Fix priorities list is saved to: {fix_priority_path}")
disabled_tests_path = os.path.join(logs_dir, "disabled_tests.lst")
disabled_tests_path = os.path.join(logs_dir, "disabled_tests.log")
with open(disabled_tests_path, "w") as disabled_tests_file:
for i in range(len(self._disabled_tests)):
self._disabled_tests[i] += "\n"
@ -484,6 +506,20 @@ class TestParallelRunner:
disabled_tests_file.close()
logger.info(f"Disabled test list is saved to: {disabled_tests_path}")
not_run_tests_path = os.path.join(logs_dir, "not_run_tests.log")
with open(not_run_tests_path, "w") as not_run_tests_path_file:
test_list_runtime = self.__get_test_list_by_runtime()
diff_set = set(test_list_runtime).difference(set(saved_tests))
diff_list = list()
for item in diff_set:
diff_list.append(f"{item}\n")
not_run_tests_path_file.writelines(diff_list)
not_run_tests_path_file.close()
logger.info(f"Not run test list is saved to: {not_run_tests_path}")
l = len(diff_list)
if l > 0:
logger.warning(f"Not run test test counter is: {len(diff_list)}")
is_successfull_run = True
test_cnt = 0
for test_st, test_res in test_results.items():