[GA] Parallel tests (#18773)

* test job

* added script to the test package

* test call fix

* switched test to large runner

* Added option to split tests by suites

* extended logs

* enabled test cache

* fixed workload

* optimized splitting mode

* excluded disabled suites

* temroary removed parallel logs

* added failed logs

* fixed empty name in suites

* test on 4 cores

* make step optional

* fixed param

* test

* grouping suites

* set suite arg

* increase test timeout

* test commit

* test pip deps

* include requirements.txt to the test package

* fixed deps step order

* fixed test counter

* fixed smart filter for suites

* clean up

* disabled repeat failed tests

* review comments

* use runtime execution time for skipped tests

* removed disabled suites

* reduced command lines

* enabled tests results

* fixed typo

* removed unused argument pp

* Log improvements

* merge cached and runtime filters

* fixed order

* fixed init list error

* fixed cache writing

* enable windows pipeline

* changed runner for windows

* optimized balancing using heap

* Fixed test counter

* fixed windows pipeline

* extended logging

* changed pipelines

* added logs on Windows

* fixed pipelines

* debug

* removed os specific code

* fixed "#"

* fixed test results

* fixed win pipeline

* cleanup debug

* rebase fixes

* windows pip requirements

* aligned run_conformance.py

* Apply suggestions from code review

Co-authored-by: Andrey Kashchikhin <andrey.kashchikhin@intel.com>

* reverted windows changes

* reverted build runner

* fixed review comments

* minor review fixes

* make help func static

* renamed test runner

* fixed merge issue

* removed unused log

* reduced command line

* fixed issue fith conformance run

* fixed typo

* set testa  as default split unit

* fixed tasks queue  with time -1

* fixed test result caculation

* reverted wrong fix

* reverted changes

* set time limitation

* reverted unused change

* fix win command lines

* reuse env variables in pipeline

* fixed install files permissions

* fixed pipeline syntax

* reset validation schema

* fixed env names

* reverted initial setting of env

* increased test runner

* fixed pathes

* reuse env path

* reset validation schema

* Revert "reuse env path"

This reverts commit 97422ac595.

* Revert "increased test runner"

This reverts commit 010aa31641.

* revert command line reduction

* made if condition clearer

---------

Co-authored-by: Andrey Kashchikhin <andrey.kashchikhin@intel.com>
This commit is contained in:
Mikhail Ryzhov 2023-09-05 16:23:17 +02:00 committed by GitHub
parent 188d53d813
commit bf5690fa7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 245 additions and 167 deletions

View File

@ -749,10 +749,12 @@ jobs:
defaults:
run:
shell: bash
runs-on: ubuntu-22.04
runs-on: ubuntu-latest-4-cores
env:
INSTALL_DIR: ${{ github.workspace }}/install
INSTALL_TEST_DIR: ${{ github.workspace }}/install/tests
PARALLEL_TEST_SCRIPT: ${{ github.workspace }}/install/tests/functional_test_utils/run_parallel.py
PARALLEL_TEST_CACHE: ${{ github.workspace }}/install/tests/test_cache.lst
steps:
- name: Create Directories
@ -784,15 +786,34 @@ jobs:
tar -xzf openvino_tests.tar.gz -C ${{ env.INSTALL_DIR }} && rm openvino_tests.tar.gz || exit 1
popd
- name: Intel CPU plugin func tests
- name: Install python dependencies
run: |
python3 -m pip install --upgrade pip
python3 -m pip install -r ${{ env.INSTALL_TEST_DIR }}/functional_test_utils/requirements.txt
- name: Cache Tests Execution Time
id: tests-functional-cpu-cache
uses: actions/cache@v3
with:
path: ${{ env.PARALLEL_TEST_CACHE }}
key: ${{ runner.os }}-tests-functional-cpu-cache
- name: Intel CPU plugin func tests (parallel)
run: |
source ${{ env.INSTALL_DIR }}/setupvars.sh
${{ env.INSTALL_TEST_DIR }}/ov_cpu_func_tests --gtest_print_time=1 --gtest_filter=*smoke* --gtest_output=xml:"${{ env.INSTALL_TEST_DIR }}/TEST-CPUFuncTests.xml"
python3 ${{ env.PARALLEL_TEST_SCRIPT }} -e ${{ env.INSTALL_TEST_DIR }}/ov_cpu_func_tests -c ${{ env.PARALLEL_TEST_CACHE }} -w ${{ env.INSTALL_TEST_DIR }} -s suite -rf 0 -- --gtest_print_time=1 --gtest_filter=*smoke*
timeout-minutes: 25
- name: Upload Test Results
uses: actions/upload-artifact@v3
if: ${{ always() }}
with:
name: test-results-functional-cpu
path: ${{ env.INSTALL_TEST_DIR }}/TEST*.xml
path: |
${{ env.INSTALL_TEST_DIR }}/TEST*.xml
${{ env.INSTALL_TEST_DIR }}/logs/failed/*.log
${{ env.INSTALL_TEST_DIR }}/logs/crashed/*.log
${{ env.INSTALL_TEST_DIR }}/logs/hanged/*.log
${{ env.INSTALL_TEST_DIR }}/logs/interapted/*.log
${{ env.INSTALL_TEST_DIR }}/logs/disabled_tests.log
if-no-files-found: 'error'

View File

@ -29,6 +29,10 @@ addIeTarget(
$<TARGET_PROPERTY:openvino::runtime::dev,INTERFACE_INCLUDE_DIRECTORIES>
)
install(PROGRAMS layer_tests_summary/run_parallel.py DESTINATION tests/functional_test_utils COMPONENT tests EXCLUDE_FROM_ALL)
install(FILES layer_tests_summary/requirements.txt DESTINATION tests/functional_test_utils COMPONENT tests EXCLUDE_FROM_ALL)
install(DIRECTORY layer_tests_summary/utils DESTINATION tests/functional_test_utils COMPONENT tests EXCLUDE_FROM_ALL)
ie_faster_build(${TARGET_NAME}
PCH PRIVATE "src/precomp.hpp"
)

View File

@ -141,7 +141,7 @@ class Conformance:
logger.info(f"The file {download_path} is archieve. Should be unzip to {path_to_save}")
return file_utils.unzip_archieve(download_path, path_to_save)
return download_path
def __dump_subgraph(self):
subgraph_dumper_path = os.path.join(self._ov_path, f'{SUBGRAPH_DUMPER_BIN_NAME}{constants.OS_BIN_FILE_EXT}')
@ -196,7 +196,7 @@ class Conformance:
if len(diff) > 0:
logger.error(f"Unexpected failures: {diff}")
exit(-1)
intersection = self._expected_failures.intersection(this_run_failures)
if this_run_failures != self._expected_failures and self._expected_failures_update:
logger.info(f"Expected failures file {self._expected_failures} will be updated!!!")
@ -225,19 +225,21 @@ class Conformance:
os.mkdir(report_dir)
if not os.path.isdir(logs_dir):
os.mkdir(logs_dir)
command_line_args = [f"--device={self._device}",
command_line_args = [f"--device={self._device}",
f'--input_folders="{self._model_path}"' if self._type == constants.OP_CONFORMANCE else '',
f"--report_unique_name", f'--output_folder="{parallel_report_dir}"',
f'--gtest_filter=\"{self._gtest_filter}\"', f'--config_path="{self._ov_config_path}"',
f'--shape_mode={self._special_mode}']
conformance = TestParallelRunner(f"{conformance_path}",
command_line_args,
self._workers,
logs_dir,
self._cache_path,
self._is_parallel_over_devices,
self._expected_failures if not self._expected_failures_update else set())
conformance = TestParallelRunner(exec_file_path=f"{conformance_path}",
test_command_line=command_line_args,
worker_num=self._workers,
working_dir=logs_dir,
cache_path=self._cache_path,
split_unit=constants.TEST_UNIT_NAME,
repeat_failed=1,
is_parallel_devices=self._is_parallel_over_devices,
excluded_tests=self._expected_failures if not self._expected_failures_update else set())
conformance.run()
conformance.postprocess_logs()
@ -319,4 +321,4 @@ if __name__ == "__main__":
args.parallel_devices, args.expected_failures,
args.expected_failures_update)
conformance.run(args.dump_graph)

View File

@ -21,6 +21,7 @@ import threading
import csv
import datetime
import shlex
import heapq
if sys.version_info.major >= 3:
import _thread as thread
@ -38,6 +39,7 @@ except:
FILENAME_LENGTH = 255
LOG_NAME_REPLACE_STR = "##NAME##"
DEFAULT_PROCESS_TIMEOUT = 3600
DEFAULT_SUITE_TIMEOUT = 3600
DEFAULT_TEST_TIMEOUT = 900
MAX_LENGHT = 4096 if not constants.IS_WIN else 8191
@ -45,17 +47,22 @@ def parse_arguments():
parser = ArgumentParser()
exec_file_path_help = "Path to the test executable file"
cache_path_help = "Path to the cache file with test_name list sorted by execution time. .lst file!"
worker_num_help = "Worker number. Default value is `cpu_count-1` "
worker_num_help = "Worker number. Default value is `cpu_count` "
working_dir_num_help = "Working dir"
process_timeout_help = "Process timeout in s"
parallel_help = "Parallel over HW devices. For example run tests over GPU.0, GPU.1 and etc"
split_unit_help = "Split by test or suite"
repeat_help = "Number of times to repeat failed and interrupted tests"
parser.add_argument("-e", "--exec_file", help=exec_file_path_help, type=str, required=True)
parser.add_argument("-c", "--cache_path", help=cache_path_help, type=str, required=False, default="")
parser.add_argument("-j", "--workers", help=worker_num_help, type=int, required=False, default=(os.cpu_count() - 1) if os.cpu_count() > 2 else 1)
parser.add_argument("-j", "--workers", help=worker_num_help, type=int, required=False, default=os.cpu_count())
parser.add_argument("-p", "--parallel_devices", help=parallel_help, type=int, required=False, default=0)
parser.add_argument("-w", "--working_dir", help=working_dir_num_help, type=str, required=False, default=".")
parser.add_argument("-t", "--process_timeout", help=process_timeout_help, type=int, required=False, default=DEFAULT_PROCESS_TIMEOUT)
parser.add_argument("-s", "--split_unit", help=split_unit_help, type=str, required=False, default=constants.TEST_UNIT_NAME)
parser.add_argument("-rf", "--repeat_failed", help=repeat_help, type=int, required=False, default=1)
return parser.parse_args()
def get_test_command_line_args():
@ -82,7 +89,7 @@ def get_device_by_args(args: list):
break
return device
# Class to read test cache
# Class to read test cache
class TestStructure:
_name = ""
_time = 0
@ -147,7 +154,7 @@ class TaskManager:
# logger.warning(f"Impossible to kill process {pid} with error: {err}")
pass
def __find_free_process(self):
while True:
for pid in range(len(self._process_list)):
@ -184,8 +191,8 @@ class TaskManager:
self._timers[pid] = datetime.datetime.now()
self._idx += 1
return True
def compelete_all_processes(self):
def compelete_all_processes(self):
while len(self._process_list) > 0:
for pid in range(len(self._process_list)):
try:
@ -206,7 +213,7 @@ class TaskManager:
class TestParallelRunner:
def __init__(self, exec_file_path: os.path, test_command_line: list,
worker_num: int, working_dir: os.path, cache_path: os.path,
is_parallel_devices=False, excluded_tests=set()):
split_unit: str, repeat_failed: int, is_parallel_devices=False, excluded_tests=set()):
self._exec_file_path = exec_file_path
self._working_dir = working_dir
self._conformance_ir_filelists = list()
@ -221,17 +228,19 @@ class TestParallelRunner:
if not os.path.exists(head):
os.mkdir(head)
self._is_save_cache = True
if split_unit in constants.UNIT_NAMES:
self._split_unit = split_unit
else:
logger.error(f"Incorrect split_unit argument: {split_unit}. Please use the following values: {','.join(constants.UNIT_NAMES)}")
sys.exit(-1)
self._repeat_failed = repeat_failed
self._disabled_tests = list()
self._total_test_cnt = 0
self._device = get_device_by_args(self._command.split())
self._available_devices = [self._device] if not self._device is None else []
if has_python_api and is_parallel_devices:
self._available_devices = get_available_devices(self._device)
self._excluded_tests_re = set()
self._orig_excluded_tests = excluded_tests
for test in excluded_tests:
self._excluded_tests_re.add(f'"{self.__replace_restricted_symbols(test)}":')
self._excluded_tests = excluded_tests
def __init_basic_command_line_for_exec_file(self, test_command_line: list):
command = f'{self._exec_file_path}'
@ -241,6 +250,8 @@ class TestParallelRunner:
is_input_folder = True
command += f" --input_folders="
argument = argument[argument.find("=")+1:]
elif "--gtest_filter" in argument:
self._gtest_filter = argument[argument.find("=")+1:]
if is_input_folder and argument[0] != "-":
buf = ""
for _ in argument.split(','):
@ -250,13 +261,33 @@ class TestParallelRunner:
buf = file_utils.prepare_filelist(input_path, ["*.xml"])
self._conformance_ir_filelists.append(buf)
buf += ","
argument = buf
argument = buf
else:
is_input_folder = False
command += f" "
command += f"{argument}"
return command
@staticmethod
def __get_suite_filter(test_filter: str, suite_filter: str):
filters = test_filter.split(':')
suite_filter_mixed = ''
for filter in filters:
patterns = filter.strip('\"').split('*')
suite_filter = f'{suite_filter}*'
suite_filter_part = suite_filter
for pattern in patterns:
if pattern and suite_filter.find(pattern) == -1:
suite_filter_part += f'{pattern}*'
if suite_filter_part == suite_filter:
suite_filter_mixed = f'"{suite_filter_part}"'
break
if not suite_filter_mixed:
suite_filter_mixed = f'"{suite_filter_part}"'
else:
suite_filter_mixed += f':"{suite_filter_part}"'
return suite_filter_mixed
@staticmethod
def __replace_restricted_symbols(input_string:str):
restricted_symbols = "!@$%^&-+`~:;\",<>?"
@ -264,7 +295,9 @@ class TestParallelRunner:
input_string = input_string.replace(symbol, '*')
return input_string
def __get_test_list_by_runtime(self):
def __get_test_list_by_runtime(self, test_unit = constants.TEST_UNIT_NAME):
self._total_test_cnt = 0
self._disabled_tests.clear()
test_list_file_name = os.path.join(self._working_dir, "test_list.lst")
if os.path.isfile(test_list_file_name):
try:
@ -282,156 +315,147 @@ class TestParallelRunner:
logger.error(f"The test list file does not exists! Please check the process output!")
exit(-1)
test_list = list()
tests_dict = dict()
with open(test_list_file_name) as test_list_file:
test_suite = ""
for test_name in test_list_file.read().split('\n'):
if "Running main() from" in test_name:
continue
if not ' ' in test_name:
test_suite = test_name
test_suite = test_name.replace(".", "")
continue
pos = test_name.find('#')
pos = test_name.find(' # ')
if pos > 0 or test_suite != "":
real_test_name = test_suite + (test_name[2:pos-2] if pos > 0 else test_name[2:])
real_test_name = test_suite + "." + (test_name[2:pos-1] if pos > 0 else test_name[2:])
if constants.DISABLED_PREFIX in real_test_name:
self._disabled_tests.append(real_test_name)
else:
test_list.append(f'"{self.__replace_restricted_symbols(real_test_name)}":')
elif test_unit == constants.TEST_UNIT_NAME:
tests_dict[real_test_name] = 1
self._total_test_cnt += 1
elif test_unit == constants.SUITE_UNIT_NAME:
tests_dict[test_suite] = tests_dict.get(test_suite, 0) + 1
self._total_test_cnt += 1
test_list_file.close()
os.remove(test_list_file_name)
logger.info(f"Len test_list_runtime (without disabled tests): {len(test_list)}")
if len(test_list) == 0:
logger.info(f"Len test_list_runtime (without disabled tests): {len(tests_dict)}")
if len(tests_dict) == 0:
logger.warning(f"Look like there are not tests to run! Please check the filters!")
exit(0)
return test_list
return tests_dict
def __get_test_list_by_cache(self):
test_list_cache = list()
tests_dict_cache = dict()
if os.path.isfile(self._cache_path):
logger.info(f"Get test list from cache file: {self._cache_path}")
with open(self._cache_path, "r") as cache_file:
for line in cache_file.readlines():
pos = line.find(":")
time = line[:pos]
test_name = line[pos+1:]
if not constants.DISABLED_PREFIX in test_name:
test_list_cache.append(TestStructure(test_name.replace("\n", ""), time))
logger.info(f"Len test_list_cache: {len(test_list_cache)}")
return test_list_cache
time = int(line[:pos])
test_name = line[pos+1:].replace("\n", "")
test_suite = test_name[:test_name.find(".")]
def __generate_test_lists(self, test_list_cache: list, test_list_runtime:list):
cached_test_list = list()
runtime_test_test = list()
cached_test_list_names = list()
if self._split_unit == constants.TEST_UNIT_NAME:
if constants.DISABLED_PREFIX not in test_name:
if (time != -1):
tests_dict_cache[test_name] = tests_dict_cache.get(test_name, 0) + time
elif self._split_unit == constants.SUITE_UNIT_NAME:
if constants.DISABLED_PREFIX not in test_suite:
if (time == -1):
tests_dict_cache[test_suite] = tests_dict_cache.get(test_suite, -1)
else:
tests_dict_cache[test_suite] = tests_dict_cache.get(test_suite, 0) + time
for test in test_list_cache:
if test._name in test_list_runtime and not test in self._excluded_tests_re:
cached_test_list.append(test)
cached_test_list_names.append(test._name)
for test in test_list_runtime:
if not test in cached_test_list_names and not test in self._excluded_tests_re:
runtime_test_test.append(test)
logger.info(f"Len tests_dict_cache: {len(tests_dict_cache)}")
return tests_dict_cache
if len(runtime_test_test) > 0:
def __generate_test_lists(self, test_dict_cache: dict, test_dict_runtime: dict):
cached_test_dict = dict()
runtime_test_dict = dict()
for test in test_dict_cache:
if test in test_dict_runtime and test not in self._excluded_tests:
cached_test_dict[test] = test_dict_cache[test]
for test in test_dict_runtime:
if test not in cached_test_dict and test not in self._excluded_tests:
runtime_test_dict[test] = test_dict_runtime[test]
if len(runtime_test_dict) > 0:
logger.warning(f'Cache file is not relevant the run. The will works in hybrid mode.')
logger.info(f'Test count from cache: {len(cached_test_list)}')
logger.info(f'Test count from runtime: {len(runtime_test_test)}')
return cached_test_list, runtime_test_test
logger.info(f'{self._split_unit.title()} count from cache: {len(cached_test_dict)}')
logger.info(f'{self._split_unit.title()} count from runtime: {len(runtime_test_dict)}')
return cached_test_dict, runtime_test_dict
def __prepare_smart_filters(self, proved_test_list:list):
res_test_filters = list()
def __prepare_smart_filters(self, proved_test_dict: dict):
def_length = len(self._command) + len(" --gtest_filter=")
idx = len(proved_test_list)
for i in range(len(proved_test_list)):
if proved_test_list[i]._time == -1:
idx = i
break
# Run crashed tests in a separed thread
if idx < len(proved_test_list):
while proved_test_list[idx]._time == -1:
test = proved_test_list.pop(idx)
res_test_filters.append(test._name)
if idx >= len(proved_test_list):
break
if constants.IS_WIN:
# subprocess add cmd.exe to the command line on Windows if shell=True
def_length += len(f'{os.environ.get("COMSPEC", "cmd.exe")} /C ')
longest_device = ""
for device in self._available_devices:
if len(device) > len(longest_device):
longest_device = device
# prepare gtest filters per worker according command line length limitation
while len(proved_test_list) > 0:
test_times = []
is_not_full = True
worker_test_filters = list()
real_worker_num = self._worker_num * len(self._available_devices)
real_worker_num = self._worker_num * len(self._available_devices)
tasks_crashed = []
tasks_full = []
tasks_not_full = []
tests_sorted = sorted(proved_test_dict.items(), key=lambda i: i[1], reverse=True)
for test_pattern, test_time in tests_sorted:
test_pattern = f'{self.__replace_restricted_symbols(test_pattern)}'
for _ in range(real_worker_num):
if len(proved_test_list) == 0:
break
worker_test_filters.append(f'"{self.__replace_restricted_symbols(proved_test_list[0]._name)}":')
test = proved_test_list.pop(0)
test_times.append(test._time)
while is_not_full and len(proved_test_list) > 0:
for i in range(real_worker_num):
if i >= len(proved_test_list):
if self._split_unit == constants.SUITE_UNIT_NAME:
# fix the suite filters to execute the right amount of the tests
test_pattern = f'{self.__get_suite_filter(self._gtest_filter, test_pattern)}:'
else:
# add quotes and pattern splitter
test_pattern = f'"{test_pattern}":'
if test_time == -1:
tasks_crashed.append((test_time, test_pattern))
else:
while len(tasks_not_full) > 0:
t_time, t_pattern = tasks_not_full[0]
length = len(t_pattern) + def_length + len(test_pattern.replace(self._device, longest_device))
if length < MAX_LENGHT:
break
if i == 0:
continue
while test_times[0] > test_times[i] + proved_test_list[len(proved_test_list) - 1]._time:
final_pos = len(proved_test_list) - 1
filter = proved_test_list[final_pos]._name
if len(worker_test_filters[i]) + def_length + len(filter.replace(self._device, longest_device)) < MAX_LENGHT:
worker_test_filters[i] += f'"{self.__replace_restricted_symbols(filter)}":'
test_times[i] += proved_test_list[final_pos]._time
proved_test_list.pop(final_pos)
else:
is_not_full = False
break
if len(proved_test_list) == 0:
break
if is_not_full and len(proved_test_list) > 0:
filter = proved_test_list[0]._name
if len(worker_test_filters[0]) + def_length + len(filter.replace(self._device, longest_device)) < MAX_LENGHT:
worker_test_filters[0] += f'"{self.__replace_restricted_symbols(filter)}":'
test_times[0] += proved_test_list[0]._time
proved_test_list.pop(0)
else:
is_not_full = False
for filter in worker_test_filters:
res_test_filters.append(filter)
is_not_full = True
# logging for debug
for i in range(len(res_test_filters)):
filter = res_test_filters[i]
cnt = filter.count('\":')
self._total_test_cnt += cnt
# logger.info(f"Number of tests in job_{i}: {cnt}")
return res_test_filters
tasks_full.append(tasks_not_full.pop())
if len(tasks_not_full) < real_worker_num:
heapq.heappush(tasks_not_full, (test_time, test_pattern))
else:
heapq.heapreplace(tasks_not_full, (t_time + test_time, t_pattern + test_pattern))
test_filters = tasks_full + tasks_not_full + tasks_crashed
test_filters.sort(reverse=True)
# convert to list and exlude empty jobs
test_filters = [task[1] for task in test_filters if task[1]]
return test_filters
def __get_filters(self):
if not os.path.isfile(self._exec_file_path):
logger.error(f"Test executable file {self._exec_file_path} is not exist!")
sys.exit(-1)
test_list_runtime = self.__get_test_list_by_runtime()
test_list_cache = self.__get_test_list_by_cache()
cached_test_list, runtime_test_list = self.__generate_test_lists(test_list_cache, test_list_runtime)
if len(cached_test_list) > 0:
test_dict_runtime = self.__get_test_list_by_runtime(self._split_unit)
test_dict_cache = self.__get_test_list_by_cache()
cached_test_dict, runtime_test_dist = self.__generate_test_lists(test_dict_cache, test_dict_runtime)
cached_test_list = list()
if len(cached_test_dict) > 0:
self._is_save_cache = False
cached_test_list = self.__prepare_smart_filters(cached_test_list)
if len(runtime_test_list) > 0:
cached_test_list = self.__prepare_smart_filters(cached_test_dict)
runtime_test_list = list()
if len(runtime_test_dist) > 0:
self._is_save_cache = True
self._total_test_cnt += len(runtime_test_list)
runtime_test_list.reverse()
runtime_test_list = self.__prepare_smart_filters(runtime_test_dist)
logger.info(f"Total test counter is {self._total_test_cnt}")
return cached_test_list, runtime_test_list
def __execute_tests(self, filters: list(), prev_worker_cnt = 0):
commands = [f'{self._command} --gtest_filter={filter}' for filter in filters]
tmp_log_dir = os.path.join(self._working_dir, "temp")
@ -463,7 +487,7 @@ class TestParallelRunner:
test_name = line[line.find(constants.RUN) + len(constants.RUN) + 1:-1:]
has_status = False
if test_name is not None:
test_names.add(f'"{self.__replace_restricted_symbols(test_name)}":')
test_names.add(test_name)
for _, status_messages in constants.TEST_STATUS.items():
for status_msg in status_messages:
if status_msg in line:
@ -471,12 +495,12 @@ class TestParallelRunner:
break
if has_status:
break
if not has_status:
interapted_tests.append(f'"{test_name}":')
if not has_status and test_name:
interapted_tests.append(test_name)
log_file.close()
test_list_runtime = set(self.__get_test_list_by_runtime())
not_runned_tests = test_list_runtime.difference(test_names).difference(self._excluded_tests_re)
interapted_tests = set(interapted_tests).difference(self._excluded_tests_re)
not_runned_tests = test_list_runtime.difference(test_names).difference(self._excluded_tests)
interapted_tests = set(interapted_tests).difference(self._excluded_tests)
return list(not_runned_tests), list(interapted_tests)
def run(self):
@ -486,26 +510,29 @@ class TestParallelRunner:
if len(self._available_devices) > 1:
logger.info(f"Tests will be run over devices: {self._available_devices} instead of {self._device}")
t_start = datetime.datetime.now()
filters_cache, filters_runtime = self.__get_filters()
# it is better to reuse workes for both cached and runtime tasks
test_filters = filters_cache + filters_runtime
worker_cnt = 0
if len(filters_cache):
logger.info(f"Execute jobs taken from cache")
worker_cnt += self.__execute_tests(filters_cache, worker_cnt)
if len(test_filters):
logger.info(f"Execute jobs taken from cache and runtime")
worker_cnt += self.__execute_tests(test_filters, worker_cnt)
# 15m for one test in one process
if TaskManager.process_timeout == -1 or TaskManager.process_timeout == DEFAULT_PROCESS_TIMEOUT:
TaskManager.process_timeout = DEFAULT_TEST_TIMEOUT
if len(filters_runtime):
logger.info(f"Execute jobs taken from runtime")
worker_cnt += self.__execute_tests(filters_runtime, worker_cnt)
not_runned_test_filter, interapted_tests = self.__find_not_runned_tests()
if len(not_runned_test_filter) > 0:
logger.info(f"Execute not runned {len(not_runned_test_filter)} tests")
worker_cnt += self.__execute_tests(not_runned_test_filter, worker_cnt)
if len(interapted_tests) > 0:
logger.info(f"Execute interapted {len(interapted_tests)} tests")
worker_cnt += self.__execute_tests(interapted_tests, worker_cnt)
TaskManager.process_timeout = DEFAULT_SUITE_TIMEOUT if self._split_unit == constants.SUITE_UNIT_NAME else DEFAULT_TEST_TIMEOUT
not_runned_tests, interapted_tests = self.__find_not_runned_tests()
if (self._repeat_failed > 0):
if len(not_runned_tests) > 0:
logger.info(f"Execute not runned {len(not_runned_tests)} tests")
not_runned_test_filters = [f'"{self.__replace_restricted_symbols(test)}"' for test in not_runned_tests]
worker_cnt += self.__execute_tests(not_runned_test_filters, worker_cnt)
if len(interapted_tests) > 0:
logger.info(f"Execute interapted {len(interapted_tests)} tests")
interapted_tests_filters = [f'"{self.__replace_restricted_symbols(test)}"' for test in interapted_tests]
worker_cnt += self.__execute_tests(interapted_tests_filters, worker_cnt)
t_end = datetime.datetime.now()
total_seconds = (t_end - t_start).total_seconds()
@ -547,7 +574,7 @@ class TestParallelRunner:
with open(test_log_filename, "w") as log:
log.writelines(test_log)
log.close()
saved_tests.append(f'\"{test_name}\":')
saved_tests.append(test_name)
return True
logs_dir = os.path.join(self._working_dir, "logs")
@ -566,6 +593,7 @@ class TestParallelRunner:
with open(log_filename, "r") as log_file:
test_name = None
test_log = list()
test_suites = set()
dir = None
test_cnt_expected = test_cnt_real_saved_now = 0
ref_k = None
@ -580,6 +608,7 @@ class TestParallelRunner:
test_cnt_expected = line.count(':')
if constants.RUN in line:
test_name = line[line.find(constants.RUN) + len(constants.RUN) + 1:-1:]
dir = None
if self._device != None and self._available_devices != None:
for device_name in self._available_devices:
if device_name in test_name:
@ -599,6 +628,8 @@ class TestParallelRunner:
if (constants.PG_ERR in line) or (constants.PG_WARN in line):
test_log.append(line)
if test_name is not None:
test_suite = test_name[:test_name.find(".")]
test_suites.add(test_suite)
test_log.append(line)
if dir:
if __save_log(logs_dir, dir, test_name):
@ -617,17 +648,22 @@ class TestParallelRunner:
test_cnt_real_saved_now += 1
test_name = None
test_log = list()
dir = None
log_file.close()
if test_name != None:
dir = INTERAPTED_DIR
if __save_log(logs_dir, dir, test_name):
interapted_tests.add(test_name)
test_cnt_real = test_cnt_real_saved_now
if (self._split_unit == constants.SUITE_UNIT_NAME):
test_cnt_real = len(test_suites)
else:
test_cnt_real = test_cnt_real_saved_now
if test_cnt_real < test_cnt_expected:
logger.error(f"Number of tests in {log}: {test_cnt_real}. Expected is {test_cnt_expected} tests")
logger.error(f"Number of {self._split_unit}s in {log}: {test_cnt_real}. Expected is {test_cnt_expected} {self._split_unit}")
else:
os.remove(log_filename)
if len(list(Path(os.path.join(self._working_dir, "temp")).rglob("log_*.log"))) == 0:
rmtree(os.path.join(self._working_dir, "temp"))
for test_name in interapted_tests:
@ -645,7 +681,7 @@ class TestParallelRunner:
if self._is_save_cache:
test_times.sort(reverse=True)
with open(self._cache_path, "w") as cache_file:
cache_file.writelines([f"{time}:\"" + test_name + "\":\n" for time, test_name in test_times])
cache_file.writelines([f"{time}:{test_name}\n" for time, test_name in test_times])
cache_file.close()
logger.info(f"Test cache test is saved to: {self._cache_path}")
hash_table_path = os.path.join(logs_dir, "hash_table.csv")
@ -671,7 +707,7 @@ class TestParallelRunner:
_, tail = os.path.split(ir_hash)
ir_hash, _ = os.path.splitext(tail)
ir_hashes.append(ir_hash)
logger.info(f"Fix priorities list is saved to: {fix_priority_path}")
# Find all irs for failed tests
failed_ir_dir = os.path.join(self._working_dir, f'{self._device}_failed_ir')
@ -689,7 +725,7 @@ class TestParallelRunner:
xml_file = correct_ir
bin_file = prefix + constants.BIN_EXTENSION
meta_file = prefix + constants.META_EXTENSION
failed_ir_xml = xml_file.replace(head, failed_ir_dir)
failed_ir_bin = bin_file.replace(head, failed_ir_dir)
failed_ir_meta = meta_file.replace(head, failed_ir_dir)
@ -731,16 +767,16 @@ class TestParallelRunner:
not_run_tests_path = os.path.join(logs_dir, "not_run_tests.log")
with open(not_run_tests_path, "w") as not_run_tests_path_file:
test_list_runtime = self.__get_test_list_by_runtime()
diff_set = set(test_list_runtime).difference(set(saved_tests)).difference(self._orig_excluded_tests)
diff_set = set(saved_tests).intersection(test_list_runtime).difference(set(saved_tests)).difference(self._excluded_tests)
diff_list = list()
for item in diff_set:
diff_list.append(f"{item}\n")
not_run_tests_path_file.writelines(diff_list)
not_run_tests_path_file.close()
logger.info(f"Not run test list is saved to: {not_run_tests_path}")
l = len(diff_list)
if l > 0:
logger.warning(f"Not run test test counter is: {len(diff_list)}")
logger.info(f"Not run test list is saved to: {not_run_tests_path}")
is_successfull_run = True
test_cnt = 0
@ -751,8 +787,11 @@ class TestParallelRunner:
is_successfull_run = False
if len(self._disabled_tests):
logger.info(f"disabled test counter is: {len(self._disabled_tests)}")
if self._total_test_cnt != test_cnt:
diff_set = set(saved_tests).difference(set(test_list_runtime))
if diff_set:
logger.error(f"Total test count is {test_cnt} is different with expected {self._total_test_cnt} tests")
[logger.error(f'Missed test: {test}') for test in diff_set]
is_successfull_run = False
logger.info(f"Total test count with disabled tests is {test_cnt + len(self._disabled_tests)}. All logs is saved to {logs_dir}")
return is_successfull_run
@ -766,11 +805,20 @@ if __name__ == "__main__":
logger.info(f"[ARGUMENTS] --cache_path={args.cache_path}")
logger.info(f"[ARGUMENTS] --workers={args.workers}")
logger.info(f"[ARGUMENTS] --parallel_devices={args.parallel_devices}")
logger.info(f"[ARGUMENTS] --split_unit={args.split_unit}")
logger.info(f"[ARGUMENTS] --repeat_failed={args.repeat_failed}")
logger.info(f"[ARGUMENTS] Executable file arguments = {exec_file_args}")
TaskManager.process_timeout = args.process_timeout
conformance = TestParallelRunner(args.exec_file, exec_file_args, args.workers, args.working_dir, args.cache_path, args.parallel_devices)
conformance.run()
if not conformance.postprocess_logs():
test_runner = TestParallelRunner(exec_file_path = args.exec_file,
test_command_line = exec_file_args,
worker_num = args.workers,
working_dir = args.working_dir,
cache_path = args.cache_path,
split_unit = args.split_unit,
repeat_failed = args.repeat_failed,
is_parallel_devices = args.parallel_devices)
test_runner.run()
if not test_runner.postprocess_logs():
logger.error("Run is not successful")
sys.exit(-1)
else:

View File

@ -16,6 +16,9 @@ DISABLED_PREFIX = "DISABLED_"
PG_ERR = "PG ERROR"
PG_WARN = "PG WARN"
REF_COEF = "[ CONFORMANCE ] Influence coefficient: "
TEST_UNIT_NAME = "test"
SUITE_UNIT_NAME = "suite"
UNIT_NAMES = [TEST_UNIT_NAME, SUITE_UNIT_NAME]
IS_WIN = "windows" in platform or "win32" in platform
IS_MACOS = "darwin" in platform