diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/merge_xmls.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/merge_xmls.py index 5e1e8d01779..3e5fbab1a9a 100644 --- a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/merge_xmls.py +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/merge_xmls.py @@ -27,11 +27,13 @@ def parse_arguments(): output_folders_help = "Path to folder to save report" output_filename_help = "Output report filename" report_type_help = "Report type: OP or API" + merge_device_id_help = "Merge all devices with suffix to one main device. Example: GPU.0 and GPU.1 -> GPU" parser.add_argument("-i", "--input_folders", help=input_folders_help, nargs="*", required=True) parser.add_argument("-o", "--output_folder", help=output_folders_help, default=".") parser.add_argument("-f", "--output_filename", help=output_filename_help, default="report") parser.add_argument("-t", "--report_type", help=report_type_help, default="OP") + parser.add_argument("-m", "--merge_device_id", help=merge_device_id_help, default=False) return parser.parse_args() @@ -53,7 +55,8 @@ def update_result_node(xml_node: SubElement, aggregated_res: SubElement): aggregated_res.set(attr_name, str(xml_value + aggregated_value)) -def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, report_type: str): +def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, + report_type: str, merge_device_suffix=False): aggregated_timestamp = None for xml in xml_reports: # logger.info(f" Processing: {xml}") @@ -67,16 +70,22 @@ def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, re if aggregated_timestamp is None or xml_timestamp < aggregated_timestamp: aggregated_timestamp = xml_timestamp for xml_device_entry in xml_results: + device_name = xml_device_entry.tag + if merge_device_suffix and "." in xml_device_entry.tag: + device_name = xml_device_entry.tag[:xml_device_entry.tag.find("."):] + new_data = ET.tostring(xml_device_entry).decode('utf8').replace(xml_device_entry.tag, device_name) + xml_device_entry = ET.fromstring(new_data) aggregated_device_results = aggregated_results.find(xml_device_entry.tag) - if aggregated_device_results is None: - aggregated_results.append(xml_device_entry) - aggregated_device_results = aggregated_results.find(xml_device_entry.tag) - # op or api_type for xml_results_entry in xml_device_entry: - aggregated_results_entry = aggregated_device_results.find(xml_results_entry.tag) + aggregated_results_entry = None + if not aggregated_device_results is None: + aggregated_results_entry = aggregated_device_results.find(xml_results_entry.tag) if aggregated_results_entry is None: stat_update_utils.update_rel_values(xml_results_entry) - aggregated_device_results.append(xml_results_entry) + if aggregated_device_results is None: + aggregated_results.append(xml_device_entry) + else: + aggregated_device_results.append(xml_results_entry) continue if report_type == "OP": update_result_node(xml_results_entry, aggregated_results_entry) @@ -91,7 +100,8 @@ def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, re return aggregated_timestamp -def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filename: str, report_type: str): +def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filename: str, + report_type: str, merge_device_suffix=False): logger.info(f" Processing is finished") summary = Element("report") @@ -133,7 +143,7 @@ def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filenam for entity in xml_root.find(entity_name): if entity_list.find(entity.tag) is None: SubElement(entity_list, entity.tag) - timestamp = aggregate_test_results(results, xml_reports, report_type) + timestamp = aggregate_test_results(results, xml_reports, report_type, merge_device_suffix) if report_type == "OP": stat_update_utils.update_passrates(results) else: @@ -154,4 +164,4 @@ def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filenam if __name__ == "__main__": arguments = parse_arguments() - merge_xml(arguments.input_folders, arguments.output_folder, arguments.output_filename, arguments.report_type) + merge_xml(arguments.input_folders, arguments.output_folder, arguments.output_filename, arguments.report_type, arguments.merge_device_id) diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_conformance.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_conformance.py index 8ab653652c4..fac5835c8cc 100644 --- a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_conformance.py +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_conformance.py @@ -19,12 +19,12 @@ from utils.conformance_utils import get_logger from utils import file_utils logger = get_logger('conformance_runner') -is_hash = True +has_python_api = True try: from rename_conformance_ir import create_hash except: logger.warning("Please set the above env variable to get the same conformance ir names run by run!") - is_hash = False + has_python_api = False API_CONFORMANCE_BIN_NAME = "apiConformanceTests" OP_CONFORMANCE_BIN_NAME = "conformanceTests" @@ -135,7 +135,7 @@ class Conformance: logger.error("Process failed on step: 'Subgraph dumping'") exit(-1) self._model_path = conformance_ir_path - if is_hash: + if has_python_api: create_hash(Path(self._model_path)) logger.info(f"All conformance IRs in {self._ov_bin_path} were renamed based on hash") else: @@ -173,7 +173,7 @@ class Conformance: final_report_name = f'report_{self._type.lower()}' # API Conformance contains both report type - merge_xml([parallel_report_dir], report_dir, final_report_name, self._type) + merge_xml([parallel_report_dir], report_dir, final_report_name, self._type, True) if self._type == constants.API_CONFORMANCE: final_op_report_name = f'report_{constants.OP_CONFORMANCE.lower()}' merge_xml([parallel_report_dir], report_dir, final_op_report_name, constants.OP_CONFORMANCE.lower()) diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py index 62e7111372e..9fa3983cdc2 100644 --- a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/run_parallel.py @@ -13,7 +13,6 @@ from shutil import rmtree import os import sys import threading -import platform import csv import datetime import shlex @@ -23,14 +22,20 @@ if sys.version_info.major >= 3: else: import thread +has_python_api = True +logger = get_logger('test_parallel_runner') +try: + from utils.get_available_devices import get_available_devices +except: + logger.warning("Please set the above env variable to get the same conformance ir names run by run!") + has_python_api = False + FILENAME_LENGTH = 255 LOG_NAME_REPLACE_STR = "##NAME##" DEFAULT_PROCESS_TIMEOUT = 3600 DEFAULT_TEST_TIMEOUT = 900 MAX_LENGHT = 4096 if not constants.IS_WIN else 8191 -logger = get_logger('test_parallel_runner') - def parse_arguments(): parser = ArgumentParser() exec_file_path_help = "Path to the test executable file" @@ -38,10 +43,12 @@ def parse_arguments(): worker_num_help = "Worker number. Default value is `cpu_count-1` " working_dir_num_help = "Working dir" process_timeout_help = "Process timeout in s" + parallel_help = "Parallel over HW devices. For example run tests over GPU.0, GPU.1 and etc" parser.add_argument("-e", "--exec_file", help=exec_file_path_help, type=str, required=True) parser.add_argument("-c", "--cache_path", help=cache_path_help, type=str, required=False, default="") parser.add_argument("-j", "--workers", help=worker_num_help, type=int, required=False, default=(os.cpu_count() - 1) if os.cpu_count() > 2 else 1) + parser.add_argument("-p", "--parallel_devices", help=parallel_help, type=int, required=False, default=1) parser.add_argument("-w", "--working_dir", help=working_dir_num_help, type=str, required=False, default=".") parser.add_argument("-t", "--process_timeout", help=process_timeout_help, type=int, required=False, default=DEFAULT_PROCESS_TIMEOUT) return parser.parse_args() @@ -55,6 +62,21 @@ def get_test_command_line_args(): break return command_line_args +def get_device_by_args(args: list): + device = None + is_device = False + for argument in args: + if "--device" in argument: + is_device = True + if argument.find("=") == -1: + continue + device = argument[argument.find("=")+1:] + break + if is_device and argument[0] != "-": + device = argument + break + return device + # Class to read test cache class TestStructure: _name = "" @@ -67,7 +89,7 @@ class TestStructure: class TaskManager: process_timeout = -1 - def __init__(self, command_list:list, working_dir: os.path, prev_run_cmd_length = 0): + def __init__(self, command_list:list, working_dir: os.path, prev_run_cmd_length=0, device=None, available_devices=list()): self._command_list = command_list self._process_list = list() self._workers = list() @@ -75,6 +97,14 @@ class TaskManager: self._log_filename = os.path.join(working_dir, f"log_{LOG_NAME_REPLACE_STR}.log") self._prev_run_cmd_length = prev_run_cmd_length self._idx = 0 + self._device = device + if self._device is None: + self._device = "NOT_AFFECTED_BY_DEVICE" + if len(available_devices) > 0: + self._available_devices = available_devices + else: + self._available_devices = [self._device] + self._device_cnt = len(self._available_devices) def __create_thread(self, func): thread = threading.Thread(target=func) @@ -86,19 +116,23 @@ class TaskManager: if len(self._command_list) <= self._idx: logger.warning(f"Skip worker initialiazation. Command list lenght <= worker index") return - log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length)) - with open(log_file_name, "w") as log_file: - args = self._command_list[self._idx] - if not constants.IS_WIN: - args = shlex.split(self._command_list[self._idx]) - worker = self.__create_thread( - self._process_list.append(Popen(args, shell=constants.IS_WIN, stdout=log_file, stderr=log_file))) - self._workers.append(worker) - worker.join() - self._timers.append(datetime.datetime.now()) - log_file.close() - # logger.info(f"{self._idx}/{len(self._command_list)} is started") - self._idx += 1 + if self._device_cnt == 0: + logger.error(f"Empty available devices! Check your device!") + exit(-1) + for target_device in self._available_devices: + log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length)) + with open(log_file_name, "w") as log_file: + args = self._command_list[self._idx].replace(self._device, target_device) + if not constants.IS_WIN: + args = shlex.split(args) + worker = self.__create_thread( + self._process_list.append(Popen(args, shell=constants.IS_WIN, stdout=log_file, stderr=log_file))) + self._workers.append(worker) + worker.join() + self._timers.append(datetime.datetime.now()) + log_file.close() + # logger.info(f"{self._idx}/{len(self._command_list)} is started") + self._idx += 1 def __find_free_process(self): while True: @@ -108,24 +142,25 @@ class TaskManager: logger.warning(f"Process {pid} exceed time limetattion per process") self._process_list[pid].kill() self._process_list[pid].wait(timeout=0) + device = get_device_by_args(self._process_list[pid].args) # logger.info(f"{self._idx}/{len(self._command_list)} is started") - return pid + return pid, device except TimeoutExpired: continue - def __update_process(self, pid:int, log_file): - args = self._command_list[self._idx] + def __update_process(self, pid:int, log_file, device): + args = self._command_list[self._idx].replace(self._device, device) if not constants.IS_WIN: - args = shlex.split(self._command_list[self._idx]) + args = shlex.split(args) self._process_list[pid] = Popen(args, shell=constants.IS_WIN, stdout=log_file, stderr=log_file) def update_worker(self): if self._idx >= len(self._command_list): return False - pid = self.__find_free_process() + pid, device = self.__find_free_process() log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length)) with open(log_file_name, "w") as log_file: - self._workers[pid] = self.__create_thread(self.__update_process(pid, log_file)) + self._workers[pid] = self.__create_thread(self.__update_process(pid, log_file, device)) self._workers[pid].join() self._timers[pid] = datetime.datetime.now() self._idx += 1 @@ -165,6 +200,12 @@ class TestParallelRunner: self._is_save_cache = True self._disabled_tests = list() self._total_test_cnt = 0 + self._available_devices = None + self._device = get_device_by_args(self._command.split()) + if has_python_api: + self._available_devices = get_available_devices(self._device) + else: + self._available_devices = [self._device] if not self._device is None else [] def __init_basic_command_line_for_exec_file(self, test_command_line: list): command = f'{self._exec_file_path}' @@ -350,7 +391,7 @@ class TestParallelRunner: def __execute_tests(self, filters: list(), prev_worker_cnt = 0): commands = [f'{self._command} --gtest_filter={filter}' for filter in filters] - task_manager = TaskManager(commands, self._working_dir, prev_worker_cnt) + task_manager = TaskManager(commands, self._working_dir, prev_worker_cnt, self._device, self._available_devices) for _ in progressbar(range(self._worker_num), "Worker initialization: ", 40): task_manager.init_worker() for _ in progressbar(range(len(commands) - self._worker_num), "Worker execution: ", 40): @@ -362,6 +403,8 @@ class TestParallelRunner: if TaskManager.process_timeout == -1: TaskManager.process_timeout = DEFAULT_PROCESS_TIMEOUT logger.info(f"Run test parallel is started. Worker num is {self._worker_num}") + if len(self._available_devices) > 1: + logger.info(f"Tests will be run over devices: {self._available_devices} instead of {self._device}") t_start = datetime.datetime.now() filters_cache, filters_runtime = self.__get_filters() @@ -436,6 +479,11 @@ class TestParallelRunner: test_cnt_expected = line.count(':') if constants.RUN in line: test_name = line[line.find(constants.RUN) + len(constants.RUN) + 1:-1:] + if self._device != None and self._available_devices != None: + for device_name in self._available_devices: + if device_name in test_name: + test_name = test_name.replace(device_name, self._device) + break if constants.REF_COEF in line: ref_k = float(line[line.rfind(' ') + 1:]) if dir is None: diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/constants.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/constants.py index 851fc57fc97..88132cc6e69 100644 --- a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/constants.py +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/constants.py @@ -22,7 +22,7 @@ OS_BIN_FILE_EXT = ".exe" if IS_WIN else "" ENV_SEPARATOR = ";" if IS_WIN else ":" PYTHON_NAME = "python" if IS_WIN else "python3" PIP_NAME = "pip" if IS_WIN else "pip3" -LD_LIB_PATH_NAME = "PATH" if IS_WIN or platform == "darwin" else "LD_LIBRARY_PATH" +LD_LIB_PATH_NAME = "PATH" if IS_WIN else "LD_LIBRARY_PATH" OPENVINO_NAME = 'openvino' PY_OPENVINO = "python_api" diff --git a/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/get_available_devices.py b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/get_available_devices.py new file mode 100644 index 00000000000..8667a7e21ca --- /dev/null +++ b/src/tests/ie_test_utils/functional_test_utils/layer_tests_summary/utils/get_available_devices.py @@ -0,0 +1,43 @@ +# Copyright (C) 2018-2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + + +try: + from openvino.runtime import Core +except: + from utils.file_utils import get_ov_path, find_latest_dir + import os + from utils.constants import PY_OPENVINO, LD_LIB_PATH_NAME + from utils.conformance_utils import get_logger, set_env_variable + + logger = get_logger("get_available_device") + + script_dir, _ = os.path.split(os.path.abspath(__file__)) + ov_bin_path = get_ov_path(script_dir, None, True) + if PY_OPENVINO in os.listdir(ov_bin_path): + env = os.environ + py_ov = os.path.join(ov_bin_path, PY_OPENVINO) + py_ov = os.path.join(py_ov, find_latest_dir(py_ov)) + + env = set_env_variable(env, "PYTHONPATH", py_ov) + env = set_env_variable(env, LD_LIB_PATH_NAME, ov_bin_path) + logger.warning("Set the following env varibles to rename conformance ir based on hash: ") + logger.warning(f'PYTHONPATH={env["PYTHONPATH"]}') + logger.warning(f'{LD_LIB_PATH_NAME}={env[LD_LIB_PATH_NAME]}') + exit(0) + else: + logger.error(f'Impossible to run the tool! PyOpenVINO was not built!') + exit(-1) + +def get_available_devices(target_device = None, exclude_device = None): + result = list() + core = Core() + if exclude_device is None: + exclude_device = "NOT_EXISTED_DEVICE" + for device in core.available_devices: + if target_device is None or target_device in device: + if exclude_device in device: + continue + result.append(device) + return result