[CONFORMANCE] Parallelization over HW devices (#16431)

* init

* just fix version

* Update merge script

* remove extra code

* Uncomment correct func

* dd

* validate_nvidia

* Small refactoring

* Trigger linux build

* Update main.cpp

revert

* trigger

* fix build

* Update main.cpp
This commit is contained in:
Irina Efode 2023-03-30 14:45:49 +04:00 committed by GitHub
parent 086ee93bcd
commit 87365fa21d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 39 deletions

View File

@ -27,11 +27,13 @@ def parse_arguments():
output_folders_help = "Path to folder to save report"
output_filename_help = "Output report filename"
report_type_help = "Report type: OP or API"
merge_device_id_help = "Merge all devices with suffix to one main device. Example: GPU.0 and GPU.1 -> GPU"
parser.add_argument("-i", "--input_folders", help=input_folders_help, nargs="*", required=True)
parser.add_argument("-o", "--output_folder", help=output_folders_help, default=".")
parser.add_argument("-f", "--output_filename", help=output_filename_help, default="report")
parser.add_argument("-t", "--report_type", help=report_type_help, default="OP")
parser.add_argument("-m", "--merge_device_id", help=merge_device_id_help, default=False)
return parser.parse_args()
@ -53,7 +55,8 @@ def update_result_node(xml_node: SubElement, aggregated_res: SubElement):
aggregated_res.set(attr_name, str(xml_value + aggregated_value))
def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, report_type: str):
def aggregate_test_results(aggregated_results: SubElement, xml_reports: list,
report_type: str, merge_device_suffix=False):
aggregated_timestamp = None
for xml in xml_reports:
# logger.info(f" Processing: {xml}")
@ -67,16 +70,22 @@ def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, re
if aggregated_timestamp is None or xml_timestamp < aggregated_timestamp:
aggregated_timestamp = xml_timestamp
for xml_device_entry in xml_results:
device_name = xml_device_entry.tag
if merge_device_suffix and "." in xml_device_entry.tag:
device_name = xml_device_entry.tag[:xml_device_entry.tag.find("."):]
new_data = ET.tostring(xml_device_entry).decode('utf8').replace(xml_device_entry.tag, device_name)
xml_device_entry = ET.fromstring(new_data)
aggregated_device_results = aggregated_results.find(xml_device_entry.tag)
if aggregated_device_results is None:
aggregated_results.append(xml_device_entry)
aggregated_device_results = aggregated_results.find(xml_device_entry.tag)
# op or api_type
for xml_results_entry in xml_device_entry:
aggregated_results_entry = aggregated_device_results.find(xml_results_entry.tag)
aggregated_results_entry = None
if not aggregated_device_results is None:
aggregated_results_entry = aggregated_device_results.find(xml_results_entry.tag)
if aggregated_results_entry is None:
stat_update_utils.update_rel_values(xml_results_entry)
aggregated_device_results.append(xml_results_entry)
if aggregated_device_results is None:
aggregated_results.append(xml_device_entry)
else:
aggregated_device_results.append(xml_results_entry)
continue
if report_type == "OP":
update_result_node(xml_results_entry, aggregated_results_entry)
@ -91,7 +100,8 @@ def aggregate_test_results(aggregated_results: SubElement, xml_reports: list, re
return aggregated_timestamp
def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filename: str, report_type: str):
def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filename: str,
report_type: str, merge_device_suffix=False):
logger.info(f" Processing is finished")
summary = Element("report")
@ -133,7 +143,7 @@ def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filenam
for entity in xml_root.find(entity_name):
if entity_list.find(entity.tag) is None:
SubElement(entity_list, entity.tag)
timestamp = aggregate_test_results(results, xml_reports, report_type)
timestamp = aggregate_test_results(results, xml_reports, report_type, merge_device_suffix)
if report_type == "OP":
stat_update_utils.update_passrates(results)
else:
@ -154,4 +164,4 @@ def merge_xml(input_folder_paths: list, output_folder_paths: str, output_filenam
if __name__ == "__main__":
arguments = parse_arguments()
merge_xml(arguments.input_folders, arguments.output_folder, arguments.output_filename, arguments.report_type)
merge_xml(arguments.input_folders, arguments.output_folder, arguments.output_filename, arguments.report_type, arguments.merge_device_id)

View File

@ -19,12 +19,12 @@ from utils.conformance_utils import get_logger
from utils import file_utils
logger = get_logger('conformance_runner')
is_hash = True
has_python_api = True
try:
from rename_conformance_ir import create_hash
except:
logger.warning("Please set the above env variable to get the same conformance ir names run by run!")
is_hash = False
has_python_api = False
API_CONFORMANCE_BIN_NAME = "apiConformanceTests"
OP_CONFORMANCE_BIN_NAME = "conformanceTests"
@ -135,7 +135,7 @@ class Conformance:
logger.error("Process failed on step: 'Subgraph dumping'")
exit(-1)
self._model_path = conformance_ir_path
if is_hash:
if has_python_api:
create_hash(Path(self._model_path))
logger.info(f"All conformance IRs in {self._ov_bin_path} were renamed based on hash")
else:
@ -173,7 +173,7 @@ class Conformance:
final_report_name = f'report_{self._type.lower()}'
# API Conformance contains both report type
merge_xml([parallel_report_dir], report_dir, final_report_name, self._type)
merge_xml([parallel_report_dir], report_dir, final_report_name, self._type, True)
if self._type == constants.API_CONFORMANCE:
final_op_report_name = f'report_{constants.OP_CONFORMANCE.lower()}'
merge_xml([parallel_report_dir], report_dir, final_op_report_name, constants.OP_CONFORMANCE.lower())

View File

@ -13,7 +13,6 @@ from shutil import rmtree
import os
import sys
import threading
import platform
import csv
import datetime
import shlex
@ -23,14 +22,20 @@ if sys.version_info.major >= 3:
else:
import thread
has_python_api = True
logger = get_logger('test_parallel_runner')
try:
from utils.get_available_devices import get_available_devices
except:
logger.warning("Please set the above env variable to get the same conformance ir names run by run!")
has_python_api = False
FILENAME_LENGTH = 255
LOG_NAME_REPLACE_STR = "##NAME##"
DEFAULT_PROCESS_TIMEOUT = 3600
DEFAULT_TEST_TIMEOUT = 900
MAX_LENGHT = 4096 if not constants.IS_WIN else 8191
logger = get_logger('test_parallel_runner')
def parse_arguments():
parser = ArgumentParser()
exec_file_path_help = "Path to the test executable file"
@ -38,10 +43,12 @@ def parse_arguments():
worker_num_help = "Worker number. Default value is `cpu_count-1` "
working_dir_num_help = "Working dir"
process_timeout_help = "Process timeout in s"
parallel_help = "Parallel over HW devices. For example run tests over GPU.0, GPU.1 and etc"
parser.add_argument("-e", "--exec_file", help=exec_file_path_help, type=str, required=True)
parser.add_argument("-c", "--cache_path", help=cache_path_help, type=str, required=False, default="")
parser.add_argument("-j", "--workers", help=worker_num_help, type=int, required=False, default=(os.cpu_count() - 1) if os.cpu_count() > 2 else 1)
parser.add_argument("-p", "--parallel_devices", help=parallel_help, type=int, required=False, default=1)
parser.add_argument("-w", "--working_dir", help=working_dir_num_help, type=str, required=False, default=".")
parser.add_argument("-t", "--process_timeout", help=process_timeout_help, type=int, required=False, default=DEFAULT_PROCESS_TIMEOUT)
return parser.parse_args()
@ -55,6 +62,21 @@ def get_test_command_line_args():
break
return command_line_args
def get_device_by_args(args: list):
device = None
is_device = False
for argument in args:
if "--device" in argument:
is_device = True
if argument.find("=") == -1:
continue
device = argument[argument.find("=")+1:]
break
if is_device and argument[0] != "-":
device = argument
break
return device
# Class to read test cache
class TestStructure:
_name = ""
@ -67,7 +89,7 @@ class TestStructure:
class TaskManager:
process_timeout = -1
def __init__(self, command_list:list, working_dir: os.path, prev_run_cmd_length = 0):
def __init__(self, command_list:list, working_dir: os.path, prev_run_cmd_length=0, device=None, available_devices=list()):
self._command_list = command_list
self._process_list = list()
self._workers = list()
@ -75,6 +97,14 @@ class TaskManager:
self._log_filename = os.path.join(working_dir, f"log_{LOG_NAME_REPLACE_STR}.log")
self._prev_run_cmd_length = prev_run_cmd_length
self._idx = 0
self._device = device
if self._device is None:
self._device = "NOT_AFFECTED_BY_DEVICE"
if len(available_devices) > 0:
self._available_devices = available_devices
else:
self._available_devices = [self._device]
self._device_cnt = len(self._available_devices)
def __create_thread(self, func):
thread = threading.Thread(target=func)
@ -86,19 +116,23 @@ class TaskManager:
if len(self._command_list) <= self._idx:
logger.warning(f"Skip worker initialiazation. Command list lenght <= worker index")
return
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length))
with open(log_file_name, "w") as log_file:
args = self._command_list[self._idx]
if not constants.IS_WIN:
args = shlex.split(self._command_list[self._idx])
worker = self.__create_thread(
self._process_list.append(Popen(args, shell=constants.IS_WIN, stdout=log_file, stderr=log_file)))
self._workers.append(worker)
worker.join()
self._timers.append(datetime.datetime.now())
log_file.close()
# logger.info(f"{self._idx}/{len(self._command_list)} is started")
self._idx += 1
if self._device_cnt == 0:
logger.error(f"Empty available devices! Check your device!")
exit(-1)
for target_device in self._available_devices:
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length))
with open(log_file_name, "w") as log_file:
args = self._command_list[self._idx].replace(self._device, target_device)
if not constants.IS_WIN:
args = shlex.split(args)
worker = self.__create_thread(
self._process_list.append(Popen(args, shell=constants.IS_WIN, stdout=log_file, stderr=log_file)))
self._workers.append(worker)
worker.join()
self._timers.append(datetime.datetime.now())
log_file.close()
# logger.info(f"{self._idx}/{len(self._command_list)} is started")
self._idx += 1
def __find_free_process(self):
while True:
@ -108,24 +142,25 @@ class TaskManager:
logger.warning(f"Process {pid} exceed time limetattion per process")
self._process_list[pid].kill()
self._process_list[pid].wait(timeout=0)
device = get_device_by_args(self._process_list[pid].args)
# logger.info(f"{self._idx}/{len(self._command_list)} is started")
return pid
return pid, device
except TimeoutExpired:
continue
def __update_process(self, pid:int, log_file):
args = self._command_list[self._idx]
def __update_process(self, pid:int, log_file, device):
args = self._command_list[self._idx].replace(self._device, device)
if not constants.IS_WIN:
args = shlex.split(self._command_list[self._idx])
args = shlex.split(args)
self._process_list[pid] = Popen(args, shell=constants.IS_WIN, stdout=log_file, stderr=log_file)
def update_worker(self):
if self._idx >= len(self._command_list):
return False
pid = self.__find_free_process()
pid, device = self.__find_free_process()
log_file_name = self._log_filename.replace(LOG_NAME_REPLACE_STR, str(self._idx + self._prev_run_cmd_length))
with open(log_file_name, "w") as log_file:
self._workers[pid] = self.__create_thread(self.__update_process(pid, log_file))
self._workers[pid] = self.__create_thread(self.__update_process(pid, log_file, device))
self._workers[pid].join()
self._timers[pid] = datetime.datetime.now()
self._idx += 1
@ -165,6 +200,12 @@ class TestParallelRunner:
self._is_save_cache = True
self._disabled_tests = list()
self._total_test_cnt = 0
self._available_devices = None
self._device = get_device_by_args(self._command.split())
if has_python_api:
self._available_devices = get_available_devices(self._device)
else:
self._available_devices = [self._device] if not self._device is None else []
def __init_basic_command_line_for_exec_file(self, test_command_line: list):
command = f'{self._exec_file_path}'
@ -350,7 +391,7 @@ class TestParallelRunner:
def __execute_tests(self, filters: list(), prev_worker_cnt = 0):
commands = [f'{self._command} --gtest_filter={filter}' for filter in filters]
task_manager = TaskManager(commands, self._working_dir, prev_worker_cnt)
task_manager = TaskManager(commands, self._working_dir, prev_worker_cnt, self._device, self._available_devices)
for _ in progressbar(range(self._worker_num), "Worker initialization: ", 40):
task_manager.init_worker()
for _ in progressbar(range(len(commands) - self._worker_num), "Worker execution: ", 40):
@ -362,6 +403,8 @@ class TestParallelRunner:
if TaskManager.process_timeout == -1:
TaskManager.process_timeout = DEFAULT_PROCESS_TIMEOUT
logger.info(f"Run test parallel is started. Worker num is {self._worker_num}")
if len(self._available_devices) > 1:
logger.info(f"Tests will be run over devices: {self._available_devices} instead of {self._device}")
t_start = datetime.datetime.now()
filters_cache, filters_runtime = self.__get_filters()
@ -436,6 +479,11 @@ class TestParallelRunner:
test_cnt_expected = line.count(':')
if constants.RUN in line:
test_name = line[line.find(constants.RUN) + len(constants.RUN) + 1:-1:]
if self._device != None and self._available_devices != None:
for device_name in self._available_devices:
if device_name in test_name:
test_name = test_name.replace(device_name, self._device)
break
if constants.REF_COEF in line:
ref_k = float(line[line.rfind(' ') + 1:])
if dir is None:

View File

@ -22,7 +22,7 @@ OS_BIN_FILE_EXT = ".exe" if IS_WIN else ""
ENV_SEPARATOR = ";" if IS_WIN else ":"
PYTHON_NAME = "python" if IS_WIN else "python3"
PIP_NAME = "pip" if IS_WIN else "pip3"
LD_LIB_PATH_NAME = "PATH" if IS_WIN or platform == "darwin" else "LD_LIBRARY_PATH"
LD_LIB_PATH_NAME = "PATH" if IS_WIN else "LD_LIBRARY_PATH"
OPENVINO_NAME = 'openvino'
PY_OPENVINO = "python_api"

View File

@ -0,0 +1,43 @@
# Copyright (C) 2018-2023 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
try:
from openvino.runtime import Core
except:
from utils.file_utils import get_ov_path, find_latest_dir
import os
from utils.constants import PY_OPENVINO, LD_LIB_PATH_NAME
from utils.conformance_utils import get_logger, set_env_variable
logger = get_logger("get_available_device")
script_dir, _ = os.path.split(os.path.abspath(__file__))
ov_bin_path = get_ov_path(script_dir, None, True)
if PY_OPENVINO in os.listdir(ov_bin_path):
env = os.environ
py_ov = os.path.join(ov_bin_path, PY_OPENVINO)
py_ov = os.path.join(py_ov, find_latest_dir(py_ov))
env = set_env_variable(env, "PYTHONPATH", py_ov)
env = set_env_variable(env, LD_LIB_PATH_NAME, ov_bin_path)
logger.warning("Set the following env varibles to rename conformance ir based on hash: ")
logger.warning(f'PYTHONPATH={env["PYTHONPATH"]}')
logger.warning(f'{LD_LIB_PATH_NAME}={env[LD_LIB_PATH_NAME]}')
exit(0)
else:
logger.error(f'Impossible to run the tool! PyOpenVINO was not built!')
exit(-1)
def get_available_devices(target_device = None, exclude_device = None):
result = list()
core = Core()
if exclude_device is None:
exclude_device = "NOT_EXISTED_DEVICE"
for device in core.available_devices:
if target_device is None or target_device in device:
if exclude_device in device:
continue
result.append(device)
return result