From a730ef18ebfafd16ea3445fab9049ed70bdc650d Mon Sep 17 00:00:00 2001 From: Ilya Churaev Date: Fri, 24 Feb 2023 15:20:32 +0400 Subject: [PATCH] Moved Task, Streams, CPUStreams Executors to new API (#15913) * Moved Task, Streams, CPUStreams Executors to new API * Fixed some build issues * Fixed new build issues * Try to fix tests * Fixed inference unit tests * Small build fix * Added more system headers * Try to fix naming style * Fixed namespace * Fixed android build --- src/inference/dev_api/ie_system_conf.h | 62 +-- .../openvino/runtime/iasync_infer_request.hpp | 22 +- .../openvino/runtime/icompiled_model.hpp | 29 +- .../dev_api/openvino/runtime/iplugin.hpp | 12 +- .../dev_api/openvino/runtime/system_conf.hpp | 193 +++++++ .../threading/cpu_streams_executor.hpp | 55 ++ .../runtime/threading/executor_manager.hpp | 13 +- .../runtime/threading/istreams_executor.hpp | 168 ++++++ .../runtime/threading/itask_executor.hpp | 76 +++ .../threading/ie_cpu_streams_executor.hpp | 2 +- .../dev_api/threading/ie_executor_manager.hpp | 6 +- .../threading/ie_istreams_executor.hpp | 76 +-- .../dev_api/threading/ie_itask_executor.hpp | 12 +- src/inference/src/dev/core_impl.cpp | 6 +- src/inference/src/dev/core_impl.hpp | 2 +- .../src/dev/iasync_infer_request.cpp | 18 +- src/inference/src/dev/icompiled_model.cpp | 8 +- .../src/dev/icompiled_model_wrapper.cpp | 3 +- src/inference/src/dev/iplugin.cpp | 4 +- src/inference/src/dev/isync_infer_request.cpp | 1 + .../dev/threading/cpu_streams_executor.cpp | 397 ++++++++++++++ .../src/dev/threading/executor_manager.cpp | 44 +- .../src/dev/threading/istreams_executor.cpp | 496 ++++++++++++++++++ .../src/dev/threading/itask_executor.cpp | 41 ++ src/inference/src/os/lin/lin_system_conf.cpp | 8 +- src/inference/src/os/win/win_system_conf.cpp | 26 +- src/inference/src/streams_executor.hpp | 4 +- .../{ie_system_conf.cpp => system_conf.cpp} | 24 +- .../src/threading/ie_cpu_streams_executor.cpp | 4 +- .../src/threading/ie_executor_manager.cpp | 59 ++- .../src/threading/ie_istreams_executor.cpp | 446 +--------------- .../src/threading/ie_itask_executor.cpp | 24 +- src/inference/tests/unit/cpu_map_parser.cpp | 28 +- .../tests/unit/ie_executor_manager_tests.cpp | 26 +- .../template/src/async_infer_request.cpp | 9 +- .../template/src/async_infer_request.hpp | 8 +- src/plugins/template/src/compiled_model.cpp | 2 +- src/plugins/template/src/compiled_model.hpp | 2 +- src/plugins/template/src/plugin.cpp | 2 +- src/plugins/template/src/plugin.hpp | 3 +- src/plugins/template/src/template_config.cpp | 16 +- src/plugins/template/src/template_config.hpp | 10 +- 42 files changed, 1719 insertions(+), 728 deletions(-) create mode 100644 src/inference/dev_api/openvino/runtime/system_conf.hpp create mode 100644 src/inference/dev_api/openvino/runtime/threading/cpu_streams_executor.hpp create mode 100644 src/inference/dev_api/openvino/runtime/threading/istreams_executor.hpp create mode 100644 src/inference/dev_api/openvino/runtime/threading/itask_executor.hpp create mode 100644 src/inference/src/dev/threading/cpu_streams_executor.cpp create mode 100644 src/inference/src/dev/threading/istreams_executor.cpp create mode 100644 src/inference/src/dev/threading/itask_executor.cpp rename src/inference/src/{ie_system_conf.cpp => system_conf.cpp} (90%) diff --git a/src/inference/dev_api/ie_system_conf.h b/src/inference/dev_api/ie_system_conf.h index 17f1781c13f..408c626accf 100644 --- a/src/inference/dev_api/ie_system_conf.h +++ b/src/inference/dev_api/ie_system_conf.h @@ -12,7 +12,7 @@ #include #include -#include "ie_api.h" +#include "openvino/runtime/system_conf.hpp" namespace InferenceEngine { @@ -23,7 +23,9 @@ namespace InferenceEngine { * @param[in] includeOMPNumThreads Indicates if the omp number threads is included * @return `True` if any OpenMP environment variable is defined, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) checkOpenMpEnvVars(bool includeOMPNumThreads = true); +inline bool checkOpenMpEnvVars(bool includeOMPNumThreads = true) { + return ov::check_open_mp_env_vars(includeOMPNumThreads); +} /** * @brief Returns available CPU NUMA nodes (on Linux, and Windows [only with TBB], single node is assumed on all @@ -31,7 +33,9 @@ INFERENCE_ENGINE_API_CPP(bool) checkOpenMpEnvVars(bool includeOMPNumThreads = tr * @ingroup ie_dev_api_system_conf * @return NUMA nodes */ -INFERENCE_ENGINE_API_CPP(std::vector) getAvailableNUMANodes(); +inline std::vector getAvailableNUMANodes() { + return ov::get_available_numa_nodes(); +} /** * @brief Returns available CPU cores types (on Linux, and Windows) and ONLY with TBB, single core type is assumed @@ -39,7 +43,9 @@ INFERENCE_ENGINE_API_CPP(std::vector) getAvailableNUMANodes(); * @ingroup ie_dev_api_system_conf * @return Vector of core types */ -INFERENCE_ENGINE_API_CPP(std::vector) getAvailableCoresTypes(); +inline std::vector getAvailableCoresTypes() { + return ov::get_available_cores_types(); +} /** * @brief Returns number of CPU physical cores on Linux/Windows (which is considered to be more performance @@ -50,7 +56,9 @@ INFERENCE_ENGINE_API_CPP(std::vector) getAvailableCoresTypes(); * @param[in] bigCoresOnly Additionally limits the number of reported cores to the 'Big' cores only. * @return Number of physical CPU cores. */ -INFERENCE_ENGINE_API_CPP(int) getNumberOfCPUCores(bool bigCoresOnly = false); +inline int getNumberOfCPUCores(bool bigCoresOnly = false) { + return ov::get_number_of_cpu_cores(bigCoresOnly); +} /** * @brief Returns number of CPU logical cores on Linux/Windows (on other OSes it simply relies on the original @@ -60,80 +68,81 @@ INFERENCE_ENGINE_API_CPP(int) getNumberOfCPUCores(bool bigCoresOnly = false); * @param[in] bigCoresOnly Additionally limits the number of reported cores to the 'Big' cores only. * @return Number of logical CPU cores. */ -INFERENCE_ENGINE_API_CPP(int) getNumberOfLogicalCPUCores(bool bigCoresOnly = false); +inline int getNumberOfLogicalCPUCores(bool bigCoresOnly = false) { + return ov::get_number_of_logical_cpu_cores(bigCoresOnly); +} /** * @brief Checks whether CPU supports SSE 4.2 capability * @ingroup ie_dev_api_system_conf * @return `True` is SSE 4.2 instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_sse42(); +using ov::with_cpu_x86_sse42; /** * @brief Checks whether CPU supports AVX capability * @ingroup ie_dev_api_system_conf * @return `True` is AVX instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx(); +using ov::with_cpu_x86_avx; /** * @brief Checks whether CPU supports AVX2 capability * @ingroup ie_dev_api_system_conf * @return `True` is AVX2 instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx2(); +using ov::with_cpu_x86_avx2; /** * @brief Checks whether CPU supports AVX 512 capability * @ingroup ie_dev_api_system_conf * @return `True` is AVX512F (foundation) instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512f(); +using ov::with_cpu_x86_avx512f; /** * @brief Checks whether CPU supports AVX 512 capability * @ingroup ie_dev_api_system_conf * @return `True` is AVX512F, AVX512BW, AVX512DQ instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core(); +using ov::with_cpu_x86_avx512_core; /** * @brief Checks whether CPU supports AVX 512 VNNI capability * @ingroup ie_dev_api_system_conf * @return `True` is AVX512F, AVX512BW, AVX512DQ, AVX512_VNNI instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_vnni(); +using ov::with_cpu_x86_avx512_core_vnni; /** * @brief Checks whether CPU supports BFloat16 capability * @ingroup ie_dev_api_system_conf * @return `True` is tAVX512_BF16 instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_bfloat16(); +using ov::with_cpu_x86_bfloat16; /** * @brief Checks whether CPU supports AMX int8 capability * @ingroup ie_dev_api_system_conf * @return `True` is tAMX_INT8 instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx_int8(); +using ov::with_cpu_x86_avx512_core_amx_int8; /** * @brief Checks whether CPU supports AMX bf16 capability * @ingroup ie_dev_api_system_conf * @return `True` is tAMX_BF16 instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx_bf16(); +using ov::with_cpu_x86_avx512_core_amx_bf16; /** * @brief Checks whether CPU supports AMX capability * @ingroup ie_dev_api_system_conf * @return `True` is tAMX_INT8 or tAMX_BF16 instructions are available, `false` otherwise */ -INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx(); +using ov::with_cpu_x86_avx512_core_amx; /** - * @enum column_of_processor_type_table * @brief This enum contains defination of each columns in processor type table which bases on cpu core types. Will * extend to support other CPU core type like ARM. * @@ -150,16 +159,9 @@ INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx(); * ALL_PROC | MAIN_CORE_PROC | EFFICIENT_CORE_PROC | HYPER_THREADING_PROC * 32 8 16 8 // Total number of one socket */ -typedef enum { - ALL_PROC = 0, //!< All processors, regardless of backend cpu - MAIN_CORE_PROC = 1, //!< Processor based on physical core of Intel Performance-cores - EFFICIENT_CORE_PROC = 2, //!< Processor based on Intel Efficient-cores - HYPER_THREADING_PROC = 3, //!< Processor based on logical core of Intel Performance-cores - PROC_TYPE_TABLE_SIZE = 4 //!< Size of processor type table -} column_of_processor_type_table; +using ov::ColumnOfProcessorTypeTable; /** - * @enum column_of_cpu_mapping_table * @brief This enum contains defination of each columns in CPU mapping table which use processor id as index. * * GROUP_ID is generated according to the following rules. @@ -181,14 +183,6 @@ typedef enum { * 6 0 4 2 2 0 * 7 0 5 2 2 0 */ -typedef enum { - CPU_MAP_PROCESSOR_ID = 0, //!< column for processor id of the processor - CPU_MAP_SOCKET_ID = 1, //!< column for socket id of the processor - CPU_MAP_CORE_ID = 2, //!< column for hardware core id of the processor - CPU_MAP_CORE_TYPE = 3, //!< column for CPU core type corresponding to the processor - CPU_MAP_GROUP_ID = 4, //!< column for group id to the processor. Processors in one group have dependency. - CPU_MAP_USED_FLAG = 5, //!< column for resource management of the processor - CPU_MAP_TABLE_SIZE = 6 //!< Size of CPU mapping table -} column_of_cpu_mapping_table; +using ov::ColumnOfCPUMappingTable; } // namespace InferenceEngine diff --git a/src/inference/dev_api/openvino/runtime/iasync_infer_request.hpp b/src/inference/dev_api/openvino/runtime/iasync_infer_request.hpp index 687b05030cd..628c2c651f8 100644 --- a/src/inference/dev_api/openvino/runtime/iasync_infer_request.hpp +++ b/src/inference/dev_api/openvino/runtime/iasync_infer_request.hpp @@ -17,7 +17,7 @@ #include "openvino/runtime/iinfer_request.hpp" #include "openvino/runtime/profiling_info.hpp" #include "openvino/runtime/tensor.hpp" -#include "threading/ie_itask_executor.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" namespace ov { @@ -37,8 +37,8 @@ namespace ov { class OPENVINO_RUNTIME_API IAsyncInferRequest : public IInferRequest { public: IAsyncInferRequest(const std::shared_ptr& request, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, - const InferenceEngine::ITaskExecutor::Ptr& callback_executor); + const std::shared_ptr& task_executor, + const std::shared_ptr& callback_executor); ~IAsyncInferRequest(); /** @@ -153,7 +153,7 @@ public: const std::vector>& get_outputs() const override; protected: - using Stage = std::pair; + using Stage = std::pair, ov::threading::Task>; /** * @brief Pipeline is vector of stages */ @@ -212,11 +212,11 @@ private: void run_first_stage(const Pipeline::iterator itBeginStage, const Pipeline::iterator itEndStage, - const InferenceEngine::ITaskExecutor::Ptr callbackExecutor = {}); + const std::shared_ptr callbackExecutor = {}); - InferenceEngine::Task make_next_stage_task(const Pipeline::iterator itStage, - const Pipeline::iterator itEndStage, - const InferenceEngine::ITaskExecutor::Ptr callbackExecutor); + ov::threading::Task make_next_stage_task(const Pipeline::iterator itStage, + const Pipeline::iterator itEndStage, + const std::shared_ptr callbackExecutor); template void infer_impl(const F& f) { @@ -264,10 +264,10 @@ private: std::shared_ptr m_sync_request; - InferenceEngine::ITaskExecutor::Ptr m_request_executor; //!< Used to run inference CPU tasks. - InferenceEngine::ITaskExecutor::Ptr + std::shared_ptr m_request_executor; //!< Used to run inference CPU tasks. + std::shared_ptr m_callback_executor; //!< Used to run post inference callback in asynchronous pipline - InferenceEngine::ITaskExecutor::Ptr + std::shared_ptr m_sync_callback_executor; //!< Used to run post inference callback in synchronous pipline mutable std::mutex m_mutex; std::function m_callback; diff --git a/src/inference/dev_api/openvino/runtime/icompiled_model.hpp b/src/inference/dev_api/openvino/runtime/icompiled_model.hpp index 44c701c1d42..c95feba6cc1 100644 --- a/src/inference/dev_api/openvino/runtime/icompiled_model.hpp +++ b/src/inference/dev_api/openvino/runtime/icompiled_model.hpp @@ -17,8 +17,8 @@ #include "openvino/runtime/common.hpp" #include "openvino/runtime/isync_infer_request.hpp" #include "openvino/runtime/remote_context.hpp" -#include "threading/ie_cpu_streams_executor.hpp" -#include "threading/ie_itask_executor.hpp" +#include "openvino/runtime/threading/cpu_streams_executor.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" namespace InferenceEngine { class ICompiledModelWrapper; @@ -47,14 +47,13 @@ public: * * @param callback_executor Callback executor (CPUStreamsExecutor by default) */ - ICompiledModel(const std::shared_ptr& model, - const std::shared_ptr& plugin, - const InferenceEngine::ITaskExecutor::Ptr& task_executor = - std::make_shared(InferenceEngine::IStreamsExecutor::Config{ - "Default"}), - const InferenceEngine::ITaskExecutor::Ptr& callback_executor = - std::make_shared(InferenceEngine::IStreamsExecutor::Config{ - "Callback"})); + ICompiledModel( + const std::shared_ptr& model, + const std::shared_ptr& plugin, + const std::shared_ptr& task_executor = + std::make_shared(ov::threading::IStreamsExecutor::Config{"Default"}), + const std::shared_ptr& callback_executor = + std::make_shared(ov::threading::IStreamsExecutor::Config{"Callback"})); /** * @brief Gets all outputs from compiled model @@ -119,8 +118,8 @@ private: std::vector> m_inputs; std::vector> m_outputs; - InferenceEngine::ITaskExecutor::Ptr m_task_executor = nullptr; //!< Holds a task executor - InferenceEngine::ITaskExecutor::Ptr m_callback_executor = nullptr; //!< Holds a callback executor + std::shared_ptr m_task_executor = nullptr; //!< Holds a task executor + std::shared_ptr m_callback_executor = nullptr; //!< Holds a callback executor friend ov::CoreImpl; friend ov::IExecutableNetworkWrapper; @@ -146,7 +145,7 @@ protected: /** * @brief Default implementation of create async inter request method * - * @tparam AsyncInferRequestType Async infer request type. InferenceEngine::AsyncInferRequestThreadSafeDefault by + * @tparam AsyncInferRequestType Async infer request type. ov::IAsyncInferRequest by * default * * @return Asynchronous infer request @@ -163,8 +162,8 @@ protected: * @return OpenVINO Plugin interface */ const std::shared_ptr& get_plugin() const; - const InferenceEngine::ITaskExecutor::Ptr get_task_executor() const; - const InferenceEngine::ITaskExecutor::Ptr get_callback_executor() const; + const std::shared_ptr get_task_executor() const; + const std::shared_ptr get_callback_executor() const; }; } // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/iplugin.hpp b/src/inference/dev_api/openvino/runtime/iplugin.hpp index 54660161b2e..e2a4b4110d3 100644 --- a/src/inference/dev_api/openvino/runtime/iplugin.hpp +++ b/src/inference/dev_api/openvino/runtime/iplugin.hpp @@ -188,7 +188,7 @@ public: * @brief Gets reference to tasks execution manager * @return Reference to ExecutorManager interface */ - const std::shared_ptr& get_executor_manager() const; + const std::shared_ptr& get_executor_manager() const; ~IPlugin() = default; @@ -198,11 +198,11 @@ protected: private: friend ::InferenceEngine::IPluginWrapper; - std::string m_plugin_name; //!< A device name that plugins enables - std::weak_ptr m_core; //!< A pointer to ICore interface - std::shared_ptr m_executor_manager; //!< A tasks execution manager - ov::Version m_version; //!< Member contains plugin version - bool m_is_new_api; //!< A flag which shows used API + std::string m_plugin_name; //!< A device name that plugins enables + std::weak_ptr m_core; //!< A pointer to ICore interface + std::shared_ptr m_executor_manager; //!< A tasks execution manager + ov::Version m_version; //!< Member contains plugin version + bool m_is_new_api; //!< A flag which shows used API }; } // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/system_conf.hpp b/src/inference/dev_api/openvino/runtime/system_conf.hpp new file mode 100644 index 00000000000..216d059ed35 --- /dev/null +++ b/src/inference/dev_api/openvino/runtime/system_conf.hpp @@ -0,0 +1,193 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +/** + * @brief Abstraction over platform specific implementations + * @file openvino/runtime/system_conf.hpp + */ + +#pragma once + +#include + +#include "openvino/runtime/common.hpp" + +namespace ov { + +/** + * @brief Checks whether OpenMP environment variables are defined + * @ingroup ov_dev_api_system_conf + * + * @param[in] include_omp_num_threads Indicates if the omp number threads is included + * @return `True` if any OpenMP environment variable is defined, `false` otherwise + */ +OPENVINO_RUNTIME_API bool check_open_mp_env_vars(bool include_omp_num_threads = true); + +/** + * @brief Returns available CPU NUMA nodes (on Linux, and Windows [only with TBB], single node is assumed on all + * other OSes) + * @ingroup ov_dev_api_system_conf + * @return NUMA nodes + */ +OPENVINO_RUNTIME_API std::vector get_available_numa_nodes(); + +/** + * @brief Returns available CPU cores types (on Linux, and Windows) and ONLY with TBB, single core type is assumed + * otherwise + * @ingroup ov_dev_api_system_conf + * @return Vector of core types + */ +OPENVINO_RUNTIME_API std::vector get_available_cores_types(); + +/** + * @brief Returns number of CPU physical cores on Linux/Windows (which is considered to be more performance + * friendly for servers) (on other OSes it simply relies on the original parallel API of choice, which usually uses the + * logical cores). call function with 'false' to get #phys cores of all types call function with 'true' to get #phys + * 'Big' cores number of 'Little' = 'all' - 'Big' + * @ingroup ov_dev_api_system_conf + * @param[in] big_cores_only Additionally limits the number of reported cores to the 'Big' cores only. + * @return Number of physical CPU cores. + */ +OPENVINO_RUNTIME_API int get_number_of_cpu_cores(bool big_cores_only = false); + +/** + * @brief Returns number of CPU logical cores on Linux/Windows (on other OSes it simply relies on the original + * parallel API of choice, which uses the 'all' logical cores). call function with 'false' to get #logical cores of + * all types call function with 'true' to get #logical 'Big' cores number of 'Little' = 'all' - 'Big' + * @ingroup ov_dev_api_system_conf + * @param[in] big_cores_only Additionally limits the number of reported cores to the 'Big' cores only. + * @return Number of logical CPU cores. + */ +OPENVINO_RUNTIME_API int get_number_of_logical_cpu_cores(bool big_cores_only = false); + +/** + * @brief Checks whether CPU supports SSE 4.2 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is SSE 4.2 instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_sse42(); + +/** + * @brief Checks whether CPU supports AVX capability + * @ingroup ov_dev_api_system_conf + * @return `True` is AVX instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx(); + +/** + * @brief Checks whether CPU supports AVX2 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is AVX2 instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx2(); + +/** + * @brief Checks whether CPU supports AVX 512 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is AVX512F (foundation) instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx512f(); + +/** + * @brief Checks whether CPU supports AVX 512 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is AVX512F, AVX512BW, AVX512DQ instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core(); + +/** + * @brief Checks whether CPU supports AVX 512 VNNI capability + * @ingroup ov_dev_api_system_conf + * @return `True` is AVX512F, AVX512BW, AVX512DQ, AVX512_VNNI instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_vnni(); + +/** + * @brief Checks whether CPU supports BFloat16 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is tAVX512_BF16 instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_bfloat16(); + +/** + * @brief Checks whether CPU supports AMX int8 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is tAMX_INT8 instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_amx_int8(); + +/** + * @brief Checks whether CPU supports AMX bf16 capability + * @ingroup ov_dev_api_system_conf + * @return `True` is tAMX_BF16 instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_amx_bf16(); + +/** + * @brief Checks whether CPU supports AMX capability + * @ingroup ov_dev_api_system_conf + * @return `True` is tAMX_INT8 or tAMX_BF16 instructions are available, `false` otherwise + */ +OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_amx(); + +/** + * @enum ColumnOfProcessorTypeTable + * @brief This enum contains defination of each columns in processor type table which bases on cpu core types. Will + * extend to support other CPU core type like ARM. + * + * The following are two example of processor type table. + * 1. Processor table of two socket CPUs XEON server + * + * ALL_PROC | MAIN_CORE_PROC | EFFICIENT_CORE_PROC | HYPER_THREADING_PROC + * 96 48 0 48 // Total number of two sockets + * 48 24 0 24 // Number of socket one + * 48 24 0 24 // Number of socket two + * + * 2. Processor table of one socket CPU desktop + * + * ALL_PROC | MAIN_CORE_PROC | EFFICIENT_CORE_PROC | HYPER_THREADING_PROC + * 32 8 16 8 // Total number of one socket + */ +enum ColumnOfProcessorTypeTable { + ALL_PROC = 0, //!< All processors, regardless of backend cpu + MAIN_CORE_PROC = 1, //!< Processor based on physical core of Intel Performance-cores + EFFICIENT_CORE_PROC = 2, //!< Processor based on Intel Efficient-cores + HYPER_THREADING_PROC = 3, //!< Processor based on logical core of Intel Performance-cores + PROC_TYPE_TABLE_SIZE = 4 //!< Size of processor type table +}; + +/** + * @enum ColumnOfCPUMappingTable + * @brief This enum contains defination of each columns in CPU mapping table which use processor id as index. + * + * GROUP_ID is generated according to the following rules. + * 1. If one MAIN_CORE_PROC and one HYPER_THREADING_PROC are based on same Performance-cores, they are in one group. + * 2. If some EFFICIENT_CORE_PROC share one L2 cachle, they are in one group. + * 3. There are no duplicate group IDs in the system + * + * The following is the example of CPU mapping table. + * 1. Four processors of two Pcore + * 2. Four processors of four Ecores shared L2 cache + * + * PROCESSOR_ID | SOCKET_ID | CORE_ID | CORE_TYPE | GROUP_ID | Used + * 0 0 0 3 0 0 + * 1 0 0 1 0 0 + * 2 0 1 3 1 0 + * 3 0 1 1 1 0 + * 4 0 2 2 2 0 + * 5 0 3 2 2 0 + * 6 0 4 2 2 0 + * 7 0 5 2 2 0 + */ +enum ColumnOfCPUMappingTable { + CPU_MAP_PROCESSOR_ID = 0, //!< column for processor id of the processor + CPU_MAP_SOCKET_ID = 1, //!< column for socket id of the processor + CPU_MAP_CORE_ID = 2, //!< column for hardware core id of the processor + CPU_MAP_CORE_TYPE = 3, //!< column for CPU core type corresponding to the processor + CPU_MAP_GROUP_ID = 4, //!< column for group id to the processor. Processors in one group have dependency. + CPU_MAP_USED_FLAG = 5, //!< column for resource management of the processor + CPU_MAP_TABLE_SIZE = 6 //!< Size of CPU mapping table +}; + +} // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/threading/cpu_streams_executor.hpp b/src/inference/dev_api/openvino/runtime/threading/cpu_streams_executor.hpp new file mode 100644 index 00000000000..0180b9a475d --- /dev/null +++ b/src/inference/dev_api/openvino/runtime/threading/cpu_streams_executor.hpp @@ -0,0 +1,55 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +/** + * @file openvino/runtime/threading/cpu_streams_executor.hpp + * @brief A header file for OpenVINO CPU-Streams-based Executor implementation. + */ + +#pragma once + +#include +#include +#include + +#include "openvino/runtime/threading/istreams_executor.hpp" + +namespace ov { +namespace threading { + +/** + * @class CPUStreamsExecutor + * @ingroup ov_dev_api_threading + * @brief CPU Streams executor implementation. The executor splits the CPU into groups of threads, + * that can be pinned to cores or NUMA nodes. + * It uses custom threads to pull tasks from single queue. + */ +class OPENVINO_RUNTIME_API CPUStreamsExecutor : public IStreamsExecutor { +public: + /** + * @brief Constructor + * @param config Stream executor parameters + */ + explicit CPUStreamsExecutor(const Config& config); + + /** + * @brief A class destructor + */ + ~CPUStreamsExecutor() override; + + void run(Task task) override; + + void execute(Task task) override; + + int get_stream_id() override; + + int get_numa_node_id() override; + +private: + struct Impl; + std::unique_ptr _impl; +}; + +} // namespace threading +} // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp b/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp index d242d97c407..6e7735a6906 100644 --- a/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp +++ b/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp @@ -10,11 +10,14 @@ #pragma once #include "openvino/runtime/common.hpp" +#include "openvino/runtime/threading/istreams_executor.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" #include "threading/ie_istreams_executor.hpp" -#include "threading/ie_itask_executor.hpp" namespace ov { +namespace threading { + /** * @interface ExecutorManager * @brief Interface for tasks execution manager. @@ -31,7 +34,7 @@ public: * @param id An unique identificator of device (Usually string representation of TargetDevice) * @return A shared pointer to existing or newly ITaskExecutor */ - virtual InferenceEngine::ITaskExecutor::Ptr get_executor(const std::string& id) = 0; + virtual std::shared_ptr get_executor(const std::string& id) = 0; /** * @brief Returns idle cpu streams executor @@ -40,8 +43,8 @@ public: * * @return pointer to streams executor config */ - virtual InferenceEngine::IStreamsExecutor::Ptr get_idle_cpu_streams_executor( - const InferenceEngine::IStreamsExecutor::Config& config) = 0; + virtual std::shared_ptr get_idle_cpu_streams_executor( + const ov::threading::IStreamsExecutor::Config& config) = 0; /** * @brief Allows to configure executor manager @@ -73,5 +76,5 @@ public: }; OPENVINO_API std::shared_ptr executor_manager(); - +} // namespace threading } // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/threading/istreams_executor.hpp b/src/inference/dev_api/openvino/runtime/threading/istreams_executor.hpp new file mode 100644 index 00000000000..aead0f07cc1 --- /dev/null +++ b/src/inference/dev_api/openvino/runtime/threading/istreams_executor.hpp @@ -0,0 +1,168 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +/** + * @file ie_istreams_executor.hpp + * @brief A header file for Inference Engine Streams-based Executor Interface + */ + +#pragma once + +#include +#include +#include + +#include "openvino/runtime/common.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" + +namespace ov { +namespace threading { + +/** + * @interface IStreamsExecutor + * @ingroup ov_dev_api_threading + * @brief Interface for Streams Task Executor. This executor groups worker threads into so-called `streams`. + * @par CPU + * The executor executes all parallel tasks using threads from one stream. + * With proper pinning settings it should reduce cache misses for memory bound workloads. + * @par NUMA + * On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream + */ +class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor { +public: + /** + * @brief Defines inference thread binding type + */ + enum ThreadBindingType : std::uint8_t { + NONE, //!< Don't bind the inference threads + CORES, //!< Bind inference threads to the CPU cores (round-robin) + // the following modes are implemented only for the TBB code-path: + NUMA, //!< Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the 'CORES' is + //!< not implemeneted) + HYBRID_AWARE //!< Let the runtime bind the inference threads depending on the cores type (default mode for the + //!< hybrid CPUs) + }; + + /** + * @brief Defines IStreamsExecutor configuration + */ + struct OPENVINO_RUNTIME_API Config { + /** + * @brief Sets configuration + * @param properties map of properties + */ + void set_property(const ov::AnyMap& properties); + + /** + * @brief Sets configuration + * @param key property name + * @param value property value + */ + void set_property(const std::string& key, const ov::Any& value); + + /** + * @brief Return configuration value + * @param key configuration key + * @return configuration value wrapped into ov::Any + */ + ov::Any get_property(const std::string& key) const; + + /** + * @brief Create appropriate multithreaded configuration + * filing unconfigured values from initial configuration using hardware properties + * @param initial Inital configuration + * @param fp_intesive additional hint for the the (Hybrid) core-types selection logic + * whether the executor should be configured for floating point intensive work (as opposite to int8 + * intensive) + * @return configured values + */ + static Config make_default_multi_threaded(const Config& initial, const bool fp_intesive = true); + static int get_default_num_streams( + const bool enable_hyper_thread = true); // no network specifics considered (only CPU's caps); + static int get_hybrid_num_streams(std::map& config, const int stream_mode); + static void update_hybrid_custom_threads(Config& config); + + std::string _name; //!< Used by `ITT` to name executor threads + int _streams = 1; //!< Number of streams. + int _threadsPerStream = 0; //!< Number of threads per stream that executes `ie_parallel` calls + ThreadBindingType _threadBindingType = ThreadBindingType::NONE; //!< Thread binding to hardware resource type. + //!< No binding by default + int _threadBindingStep = 1; //!< In case of @ref CORES binding offset type + //!< thread binded to cores with defined step + int _threadBindingOffset = 0; //!< In case of @ref CORES binding offset type thread binded to cores + //!< starting from offset + int _threads = 0; //!< Number of threads distributed between streams. + //!< Reserved. Should not be used. + int _big_core_streams = 0; //!< Number of streams in Performance-core(big core) + int _small_core_streams = 0; //!< Number of streams in Efficient-core(small core) + int _threads_per_stream_big = 0; //!< Threads per stream in big cores + int _threads_per_stream_small = 0; //!< Threads per stream in small cores + int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores + bool _enable_hyper_thread = true; //!< enable hyper thread + enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE }; + enum PreferredCoreType { + ANY, + LITTLE, + BIG, + ROUND_ROBIN // used w/multiple streams to populate the Big cores first, then the Little, then wrap around + // (for large #streams) + } _threadPreferredCoreType = + PreferredCoreType::ANY; //!< In case of @ref HYBRID_AWARE hints the TBB to affinitize + + /** + * @brief A constructor with arguments + * + * @param[in] name The executor name + * @param[in] streams @copybrief Config::_streams + * @param[in] threadsPerStream @copybrief Config::_threadsPerStream + * @param[in] threadBindingType @copybrief Config::_threadBindingType + * @param[in] threadBindingStep @copybrief Config::_threadBindingStep + * @param[in] threadBindingOffset @copybrief Config::_threadBindingOffset + * @param[in] threads @copybrief Config::_threads + * @param[in] threadPreferBigCores @copybrief Config::_threadPreferBigCores + */ + Config(std::string name = "StreamsExecutor", + int streams = 1, + int threadsPerStream = 0, + ThreadBindingType threadBindingType = ThreadBindingType::NONE, + int threadBindingStep = 1, + int threadBindingOffset = 0, + int threads = 0, + PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY) + : _name{name}, + _streams{streams}, + _threadsPerStream{threadsPerStream}, + _threadBindingType{threadBindingType}, + _threadBindingStep{threadBindingStep}, + _threadBindingOffset{threadBindingOffset}, + _threads{threads}, + _threadPreferredCoreType(threadPreferredCoreType) {} + }; + + /** + * @brief A virtual destructor + */ + ~IStreamsExecutor() override; + + /** + * @brief Return the index of current stream + * @return An index of current stream. Or throw exceptions if called not from stream thread + */ + virtual int get_stream_id() = 0; + + /** + * @brief Return the id of current NUMA Node + * @return `ID` of current NUMA Node, or throws exceptions if called not from stream thread + */ + virtual int get_numa_node_id() = 0; + + /** + * @brief Execute the task in the current thread using streams executor configuration and constraints + * @param task A task to start + */ + virtual void execute(Task task) = 0; +}; + +} // namespace threading +} // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/threading/itask_executor.hpp b/src/inference/dev_api/openvino/runtime/threading/itask_executor.hpp new file mode 100644 index 00000000000..3cb42e3200b --- /dev/null +++ b/src/inference/dev_api/openvino/runtime/threading/itask_executor.hpp @@ -0,0 +1,76 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +/** + * @file openvino/runtime/threading/task_executor.hpp + * @brief A header file for OpenVINO Task Executor Interface + */ + +#pragma once + +#include +#include +#include + +#include "openvino/runtime/common.hpp" + +namespace ov { +namespace threading { + +/** + * @brief OpenVINO Task Executor can use any copyable callable without parameters and output as a task. + * It would be wrapped into std::function object + * @ingroup ov_dev_api_threading + */ +using Task = std::function; + +/** +* @interface ITaskExecutor +* @ingroup ov_dev_api_threading +* @brief Interface for Task Executor. +* OpenVINO uses `ov::ITaskExecutor` interface to run all asynchronous internal tasks. +* Different implementations of task executors can be used for different purposes: +* - To improve cache locality of memory bound CPU tasks some executors can limit task's affinity and maximum +concurrency. +* - The executor with one worker thread can be used to serialize access to acceleration device. +* - Immediate task executor can be used to satisfy `ov::ITaskExecutor` interface restrictions but +run tasks in current thread. +* @note Implementation should guaranty thread safety of all methods +* @section Synchronization +* It is `ov::ITaskExecutor` user responsibility to wait for task execution completion. +* The `c++11` standard way to wait task completion is to use `std::packaged_task` or `std::promise` with +`std::future`. +* Here is an example of how to use `std::promise` to wait task completion and process task's exceptions: + * @snippet example_itask_executor.cpp itask_executor:define_pipeline + */ +class OPENVINO_RUNTIME_API ITaskExecutor { +public: + /** + * @brief Destroys the object. + */ + virtual ~ITaskExecutor() = default; + + /** + * @brief Execute ov::Task inside task executor context + * @param task A task to start + */ + virtual void run(Task task) = 0; + + /** + * @brief Execute all of the tasks and waits for its completion. + * Default run_and_wait() method implementation uses run() pure virtual method + * and higher level synchronization primitives from STL. + * The task is wrapped into std::packaged_task which returns std::future. + * std::packaged_task will call the task and signal to std::future that the task is finished + * or the exception is thrown from task + * Than std::future is used to wait for task execution completion and + * task exception extraction + * @note run_and_wait() does not copy or capture tasks! + * @param tasks A vector of tasks to execute + */ + virtual void run_and_wait(const std::vector& tasks); +}; + +} // namespace threading +} // namespace ov diff --git a/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp b/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp index f4b31d95fc8..12c2232a572 100644 --- a/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp +++ b/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp @@ -33,7 +33,7 @@ public: * @brief Constructor * @param config Stream executor parameters */ - explicit CPUStreamsExecutor(const Config& config = {}); + explicit CPUStreamsExecutor(const InferenceEngine::IStreamsExecutor::Config& config = {}); /** * @brief A class destructor diff --git a/src/inference/dev_api/threading/ie_executor_manager.hpp b/src/inference/dev_api/threading/ie_executor_manager.hpp index f746c17f815..ef789c82c48 100644 --- a/src/inference/dev_api/threading/ie_executor_manager.hpp +++ b/src/inference/dev_api/threading/ie_executor_manager.hpp @@ -19,10 +19,12 @@ #include "threading/ie_itask_executor.hpp" namespace ov { +namespace threading { class ExecutorManager; } +} // namespace ov namespace InferenceEngine { @@ -86,13 +88,13 @@ public: virtual bool getTbbFlag() = 0; private: - virtual std::shared_ptr get_ov_manager() const = 0; + virtual std::shared_ptr get_ov_manager() const = 0; friend class IPluginWrapper; }; INFERENCE_ENGINE_API_CPP(ExecutorManager::Ptr) executorManager(); std::shared_ptr create_old_manager( - const std::shared_ptr& manager); + const std::shared_ptr& manager); } // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_istreams_executor.hpp b/src/inference/dev_api/threading/ie_istreams_executor.hpp index efecaf606fa..bb2bbeca0b7 100644 --- a/src/inference/dev_api/threading/ie_istreams_executor.hpp +++ b/src/inference/dev_api/threading/ie_istreams_executor.hpp @@ -14,6 +14,7 @@ #include #include "ie_parameter.hpp" +#include "openvino/runtime/threading/istreams_executor.hpp" #include "threading/ie_itask_executor.hpp" namespace InferenceEngine { @@ -28,30 +29,17 @@ namespace InferenceEngine { * @par NUMA * On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream */ -class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor { +class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor, public ov::threading::IStreamsExecutor { public: /** * A shared pointer to IStreamsExecutor interface */ using Ptr = std::shared_ptr; - /** - * @brief Defines inference thread binding type - */ - enum ThreadBindingType : std::uint8_t { - NONE, //!< Don't bind the inference threads - CORES, //!< Bind inference threads to the CPU cores (round-robin) - // the following modes are implemented only for the TBB code-path: - NUMA, //!< Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the 'CORES' is - //!< not implemeneted) - HYBRID_AWARE //!< Let the runtime bind the inference threads depending on the cores type (default mode for the - //!< hybrid CPUs) - }; - /** * @brief Defines IStreamsExecutor configuration */ - struct INFERENCE_ENGINE_API_CLASS(Config) { + struct INFERENCE_ENGINE_API_CLASS(Config) : public ov::threading::IStreamsExecutor::Config { /** * @brief Supported Configuration keys * @return vector of supported configuration keys @@ -87,33 +75,6 @@ public: static int GetHybridNumStreams(std::map& config, const int stream_mode); static void UpdateHybridCustomThreads(Config& config); - std::string _name; //!< Used by `ITT` to name executor threads - int _streams = 1; //!< Number of streams. - int _threadsPerStream = 0; //!< Number of threads per stream that executes `ie_parallel` calls - ThreadBindingType _threadBindingType = ThreadBindingType::NONE; //!< Thread binding to hardware resource type. - //!< No binding by default - int _threadBindingStep = 1; //!< In case of @ref CORES binding offset type - //!< thread binded to cores with defined step - int _threadBindingOffset = 0; //!< In case of @ref CORES binding offset type thread binded to cores - //!< starting from offset - int _threads = 0; //!< Number of threads distributed between streams. - //!< Reserved. Should not be used. - int _big_core_streams = 0; //!< Number of streams in Performance-core(big core) - int _small_core_streams = 0; //!< Number of streams in Efficient-core(small core) - int _threads_per_stream_big = 0; //!< Threads per stream in big cores - int _threads_per_stream_small = 0; //!< Threads per stream in small cores - int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores - bool _enable_hyper_thread = true; //!< enable hyper thread - enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE }; - enum PreferredCoreType { - ANY, - LITTLE, - BIG, - ROUND_ROBIN // used w/multiple streams to populate the Big cores first, then the Little, then wrap around - // (for large #streams) - } _threadPreferredCoreType = - PreferredCoreType::ANY; //!< In case of @ref HYBRID_AWARE hints the TBB to affinitize - /** * @brief A constructor with arguments * @@ -134,14 +95,17 @@ public: int threadBindingOffset = 0, int threads = 0, PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY) - : _name{name}, - _streams{streams}, - _threadsPerStream{threadsPerStream}, - _threadBindingType{threadBindingType}, - _threadBindingStep{threadBindingStep}, - _threadBindingOffset{threadBindingOffset}, - _threads{threads}, - _threadPreferredCoreType(threadPreferredCoreType) {} + : ov::threading::IStreamsExecutor::Config(name, + streams, + threadsPerStream, + threadBindingType, + threadBindingStep, + threadBindingOffset, + threads, + threadPreferredCoreType) {} + + Config(const ov::threading::IStreamsExecutor::Config& config) + : ov::threading::IStreamsExecutor::Config(config) {} }; /** @@ -166,6 +130,18 @@ public: * @param task A task to start */ virtual void Execute(Task task) = 0; + + int get_stream_id() override { + return GetStreamId(); + } + + int get_numa_node_id() override { + return GetNumaNodeId(); + } + + void execute(Task task) override { + Execute(task); + } }; } // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_itask_executor.hpp b/src/inference/dev_api/threading/ie_itask_executor.hpp index 90557d08f9f..1fc2923fca9 100644 --- a/src/inference/dev_api/threading/ie_itask_executor.hpp +++ b/src/inference/dev_api/threading/ie_itask_executor.hpp @@ -14,6 +14,7 @@ #include #include "ie_api.h" +#include "openvino/runtime/threading/itask_executor.hpp" namespace InferenceEngine { @@ -22,7 +23,7 @@ namespace InferenceEngine { * It would be wrapped into std::function object * @ingroup ie_dev_api_threading */ -using Task = std::function; +using Task = ov::threading::Task; /** * @interface ITaskExecutor @@ -36,14 +37,13 @@ concurrency. * - Immediate task executor can be used to satisfy `InferenceEngine::ITaskExecutor` interface restrictions but run tasks in current thread. * @note Implementation should guaranty thread safety of all methods -* @section Synchronization * It is `InferenceEngine::ITaskExecutor` user responsibility to wait for task execution completion. * The `c++11` standard way to wait task completion is to use `std::packaged_task` or `std::promise` with `std::future`. * Here is an example of how to use `std::promise` to wait task completion and process task's exceptions: * @snippet example_itask_executor.cpp itask_executor:define_pipeline */ -class INFERENCE_ENGINE_API_CLASS(ITaskExecutor) { +class INFERENCE_ENGINE_API_CLASS(ITaskExecutor) : virtual public ov::threading::ITaskExecutor { public: /** * A shared pointer to ITaskExecutor interface @@ -55,12 +55,6 @@ public: */ virtual ~ITaskExecutor() = default; - /** - * @brief Execute InferenceEngine::Task inside task executor context - * @param task A task to start - */ - virtual void run(Task task) = 0; - /** * @brief Execute all of the tasks and waits for its completion. * Default runAndWait() method implementation uses run() pure virtual method diff --git a/src/inference/src/dev/core_impl.cpp b/src/inference/src/dev/core_impl.cpp index 3a07156b302..7c87a7c3d9e 100644 --- a/src/inference/src/dev/core_impl.cpp +++ b/src/inference/src/dev/core_impl.cpp @@ -58,7 +58,7 @@ void stripDeviceName(std::string& device, const std::string& substr) { ov::CoreImpl::CoreImpl(bool _newAPI) : m_new_api(_newAPI) { add_mutex(""); // Register global mutex - m_executor_manager = ov::executor_manager(); + m_executor_manager = ov::threading::executor_manager(); for (const auto& it : ov::get_available_opsets()) { opsetNames.insert(it.first); } @@ -633,7 +633,7 @@ void ov::CoreImpl::set_property(const std::string& device_name, const AnyMap& pr ov::Any ov::CoreImpl::get_property_for_core(const std::string& name) const { if (name == ov::force_tbb_terminate.name()) { - const auto flag = ov::executor_manager()->get_property(name).as(); + const auto flag = ov::threading::executor_manager()->get_property(name).as(); return decltype(ov::force_tbb_terminate)::value_type(flag); } else if (name == ov::cache_dir.name()) { return ov::Any(coreConfig.get_cache_dir()); @@ -994,7 +994,7 @@ void ov::CoreImpl::CoreConfig::set_and_update(ov::AnyMap& config) { it = config.find(ov::force_tbb_terminate.name()); if (it != config.end()) { auto flag = it->second.as() == CONFIG_VALUE(YES) ? true : false; - ov::executor_manager()->set_property({{it->first, flag}}); + ov::threading::executor_manager()->set_property({{it->first, flag}}); config.erase(it); } diff --git a/src/inference/src/dev/core_impl.hpp b/src/inference/src/dev/core_impl.hpp index d4ffacde19b..7e223202f03 100644 --- a/src/inference/src/dev/core_impl.hpp +++ b/src/inference/src/dev/core_impl.hpp @@ -162,7 +162,7 @@ private: } }; - std::shared_ptr m_executor_manager; + std::shared_ptr m_executor_manager; mutable std::unordered_set opsetNames; // TODO: make extensions to be optional with conditional compilation mutable std::vector extensions; diff --git a/src/inference/src/dev/iasync_infer_request.cpp b/src/inference/src/dev/iasync_infer_request.cpp index 385baba838c..45633fa7616 100644 --- a/src/inference/src/dev/iasync_infer_request.cpp +++ b/src/inference/src/dev/iasync_infer_request.cpp @@ -14,13 +14,13 @@ namespace { -struct ImmediateStreamsExecutor : public InferenceEngine::ITaskExecutor { - explicit ImmediateStreamsExecutor(const InferenceEngine::IStreamsExecutor::Ptr& streamsExecutor) +struct ImmediateStreamsExecutor : public ov::threading::ITaskExecutor { + explicit ImmediateStreamsExecutor(const std::shared_ptr& streamsExecutor) : _streamsExecutor{streamsExecutor} {} void run(InferenceEngine::Task task) override { - _streamsExecutor->Execute(std::move(task)); + _streamsExecutor->execute(std::move(task)); } - InferenceEngine::IStreamsExecutor::Ptr _streamsExecutor; + std::shared_ptr _streamsExecutor; }; } // namespace @@ -30,8 +30,8 @@ ov::IAsyncInferRequest::~IAsyncInferRequest() { } ov::IAsyncInferRequest::IAsyncInferRequest(const std::shared_ptr& request, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, - const InferenceEngine::ITaskExecutor::Ptr& callback_executor) + const std::shared_ptr& task_executor, + const std::shared_ptr& callback_executor) : m_sync_request(request), m_request_executor(task_executor), m_callback_executor(callback_executor) { @@ -117,7 +117,7 @@ void ov::IAsyncInferRequest::start_async_thread_unsafe() { void ov::IAsyncInferRequest::run_first_stage(const Pipeline::iterator itBeginStage, const Pipeline::iterator itEndStage, - const InferenceEngine::ITaskExecutor::Ptr callbackExecutor) { + const std::shared_ptr callbackExecutor) { auto& firstStageExecutor = std::get(*itBeginStage); OPENVINO_ASSERT(nullptr != firstStageExecutor); firstStageExecutor->run(make_next_stage_task(itBeginStage, itEndStage, std::move(callbackExecutor))); @@ -126,9 +126,9 @@ void ov::IAsyncInferRequest::run_first_stage(const Pipeline::iterator itBeginSta InferenceEngine::Task ov::IAsyncInferRequest::make_next_stage_task( const Pipeline::iterator itStage, const Pipeline::iterator itEndStage, - const InferenceEngine::ITaskExecutor::Ptr callbackExecutor) { + const std::shared_ptr callbackExecutor) { return std::bind( - [this, itStage, itEndStage](InferenceEngine::ITaskExecutor::Ptr& callbackExecutor) mutable { + [this, itStage, itEndStage](std::shared_ptr& callbackExecutor) mutable { std::exception_ptr currentException = nullptr; auto& thisStage = *itStage; auto itNextStage = itStage + 1; diff --git a/src/inference/src/dev/icompiled_model.cpp b/src/inference/src/dev/icompiled_model.cpp index c3e0796ab75..82b94d511d2 100644 --- a/src/inference/src/dev/icompiled_model.cpp +++ b/src/inference/src/dev/icompiled_model.cpp @@ -11,8 +11,8 @@ ov::ICompiledModel::ICompiledModel(const std::shared_ptr& model, const std::shared_ptr& plugin, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, - const InferenceEngine::ITaskExecutor::Ptr& callback_executor) + const std::shared_ptr& task_executor, + const std::shared_ptr& callback_executor) : m_plugin(plugin), m_task_executor(task_executor), m_callback_executor(callback_executor) { @@ -86,10 +86,10 @@ std::shared_ptr ov::ICompiledModel::create_infer_request const std::shared_ptr& ov::ICompiledModel::get_plugin() const { return m_plugin; } -const InferenceEngine::ITaskExecutor::Ptr ov::ICompiledModel::get_task_executor() const { +const std::shared_ptr ov::ICompiledModel::get_task_executor() const { return m_task_executor; } -const InferenceEngine::ITaskExecutor::Ptr ov::ICompiledModel::get_callback_executor() const { +const std::shared_ptr ov::ICompiledModel::get_callback_executor() const { return m_callback_executor; } diff --git a/src/inference/src/dev/icompiled_model_wrapper.cpp b/src/inference/src/dev/icompiled_model_wrapper.cpp index b0144b2a5fc..189ab993217 100644 --- a/src/inference/src/dev/icompiled_model_wrapper.cpp +++ b/src/inference/src/dev/icompiled_model_wrapper.cpp @@ -4,9 +4,8 @@ #include "icompiled_model_wrapper.hpp" -#include - #include "dev/converter_utils.hpp" +#include "ie_plugin_config.hpp" InferenceEngine::ICompiledModelWrapper::ICompiledModelWrapper( const std::shared_ptr& model) diff --git a/src/inference/src/dev/iplugin.cpp b/src/inference/src/dev/iplugin.cpp index ad8248ba928..5bed9efb18f 100644 --- a/src/inference/src/dev/iplugin.cpp +++ b/src/inference/src/dev/iplugin.cpp @@ -4,7 +4,7 @@ #include "openvino/runtime/iplugin.hpp" -ov::IPlugin::IPlugin() : m_executor_manager(ov::executor_manager()), m_is_new_api(true) {} +ov::IPlugin::IPlugin() : m_executor_manager(ov::threading::executor_manager()), m_is_new_api(true) {} void ov::IPlugin::set_version(const ov::Version& version) { m_version = version; @@ -42,7 +42,7 @@ bool ov::IPlugin::is_new_api() const { return m_is_new_api; } -const std::shared_ptr& ov::IPlugin::get_executor_manager() const { +const std::shared_ptr& ov::IPlugin::get_executor_manager() const { return m_executor_manager; } diff --git a/src/inference/src/dev/isync_infer_request.cpp b/src/inference/src/dev/isync_infer_request.cpp index c8aa79a84b9..26ba98f1180 100644 --- a/src/inference/src/dev/isync_infer_request.cpp +++ b/src/inference/src/dev/isync_infer_request.cpp @@ -7,6 +7,7 @@ #include #include "cpp_interfaces/plugin_itt.hpp" +#include "ie_blob.h" #include "openvino/core/except.hpp" #include "openvino/core/layout.hpp" #include "openvino/core/parallel.hpp" diff --git a/src/inference/src/dev/threading/cpu_streams_executor.cpp b/src/inference/src/dev/threading/cpu_streams_executor.cpp new file mode 100644 index 00000000000..ceb72eec87d --- /dev/null +++ b/src/inference/src/dev/threading/cpu_streams_executor.cpp @@ -0,0 +1,397 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +#include "openvino/runtime/threading/cpu_streams_executor.hpp" + +#include +#include +#include +#include +#include +#include + +#include "openvino/itt.hpp" +#include "openvino/runtime/system_conf.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" +#include "threading/ie_parallel_custom_arena.hpp" +#include "threading/ie_thread_affinity.hpp" +#include "threading/ie_thread_local.hpp" + +namespace ov { +namespace threading { +struct CPUStreamsExecutor::Impl { + struct Stream { +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + struct Observer : public custom::task_scheduler_observer { + InferenceEngine::CpuSet _mask; + int _ncpus = 0; + int _threadBindingStep = 0; + int _offset = 0; + int _cpuIdxOffset = 0; + Observer(custom::task_arena& arena, + InferenceEngine::CpuSet mask, + int ncpus, + const int streamId, + const int threadsPerStream, + const int threadBindingStep, + const int threadBindingOffset, + const int cpuIdxOffset = 0) + : custom::task_scheduler_observer(arena), + _mask{std::move(mask)}, + _ncpus(ncpus), + _threadBindingStep(threadBindingStep), + _offset{streamId * threadsPerStream + threadBindingOffset}, + _cpuIdxOffset(cpuIdxOffset) {} + void on_scheduler_entry(bool) override { + InferenceEngine::PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(), + _threadBindingStep, + _ncpus, + _mask, + _cpuIdxOffset); + } + void on_scheduler_exit(bool) override { + PinCurrentThreadByMask(_ncpus, _mask); + } + ~Observer() override = default; + }; +#endif + explicit Stream(Impl* impl) : _impl(impl) { + { + std::lock_guard lock{_impl->_streamIdMutex}; + if (_impl->_streamIdQueue.empty()) { + _streamId = _impl->_streamId++; + } else { + _streamId = _impl->_streamIdQueue.front(); + _impl->_streamIdQueue.pop(); + } + } + _numaNodeId = _impl->_config._streams + ? _impl->_usedNumaNodes.at((_streamId % _impl->_config._streams) / + ((_impl->_config._streams + _impl->_usedNumaNodes.size() - 1) / + _impl->_usedNumaNodes.size())) + : _impl->_usedNumaNodes.at(_streamId % _impl->_usedNumaNodes.size()); +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + const auto concurrency = (0 == _impl->_config._threadsPerStream) ? custom::task_arena::automatic + : _impl->_config._threadsPerStream; + if (ThreadBindingType::HYBRID_AWARE == _impl->_config._threadBindingType) { + if (Config::PreferredCoreType::ROUND_ROBIN != _impl->_config._threadPreferredCoreType) { + if (Config::PreferredCoreType::ANY == _impl->_config._threadPreferredCoreType) { + _taskArena.reset(new custom::task_arena{concurrency}); + } else { + const auto selected_core_type = + Config::PreferredCoreType::BIG == _impl->_config._threadPreferredCoreType + ? custom::info::core_types().back() // running on Big cores only + : custom::info::core_types().front(); // running on Little cores only + _taskArena.reset(new custom::task_arena{custom::task_arena::constraints{} + .set_core_type(selected_core_type) + .set_max_concurrency(concurrency)}); + } + } else { + // assigning the stream to the core type in the round-robin fashion + // wrapping around total_streams (i.e. how many streams all different core types can handle + // together). Binding priority: Big core, Logical big core, Small core + const auto total_streams = _impl->total_streams_on_core_types.back().second; + const auto big_core_streams = _impl->total_streams_on_core_types.front().second; + const auto hybrid_core = _impl->total_streams_on_core_types.size() > 1; + const auto phy_core_streams = + _impl->_config._big_core_streams == 0 + ? 0 + : _impl->num_big_core_phys / _impl->_config._threads_per_stream_big; + const auto streamId_wrapped = _streamId % total_streams; + const auto& selected_core_type = + std::find_if( + _impl->total_streams_on_core_types.cbegin(), + _impl->total_streams_on_core_types.cend(), + [streamId_wrapped](const decltype(_impl->total_streams_on_core_types)::value_type& p) { + return p.second > streamId_wrapped; + }) + ->first; + const auto small_core = hybrid_core && selected_core_type == 0; + const auto logic_core = !small_core && streamId_wrapped >= phy_core_streams; + const auto small_core_skip = small_core && _impl->_config._threads_per_stream_small == 3 && + _impl->_config._small_core_streams > 1; + const auto max_concurrency = + small_core ? _impl->_config._threads_per_stream_small : _impl->_config._threads_per_stream_big; + // Special handling of _threads_per_stream_small == 3 + const auto small_core_id = small_core_skip ? 0 : streamId_wrapped - big_core_streams; + const auto stream_id = + hybrid_core + ? (small_core ? small_core_id + : (logic_core ? streamId_wrapped - phy_core_streams : streamId_wrapped)) + : streamId_wrapped; + const auto thread_binding_step = hybrid_core ? (small_core ? _impl->_config._threadBindingStep : 2) + : _impl->_config._threadBindingStep; + // Special handling of _threads_per_stream_small == 3, need to skip 4 (Four cores share one L2 cache + // on the small core), stream_id = 0, cpu_idx_offset cumulative plus 4 + const auto small_core_offset = + small_core_skip ? _impl->_config._small_core_offset + (streamId_wrapped - big_core_streams) * 4 + : _impl->_config._small_core_offset; + const auto cpu_idx_offset = + hybrid_core + // Prevent conflicts with system scheduling, so default cpu id on big core starts from 1 + ? (small_core ? small_core_offset : (logic_core ? 0 : 1)) + : 0; + + _taskArena.reset(new custom::task_arena{custom::task_arena::constraints{} + .set_core_type(selected_core_type) + .set_max_concurrency(max_concurrency)}); + InferenceEngine::CpuSet processMask; + int ncpus = 0; + std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask(); + if (nullptr != processMask) { + _observer.reset(new Observer{*_taskArena, + std::move(processMask), + ncpus, + stream_id, + max_concurrency, + thread_binding_step, + _impl->_config._threadBindingOffset, + cpu_idx_offset}); + _observer->observe(true); + } + } + } else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) { + _taskArena.reset(new custom::task_arena{custom::task_arena::constraints{_numaNodeId, concurrency}}); + } else if ((0 != _impl->_config._threadsPerStream) || + (ThreadBindingType::CORES == _impl->_config._threadBindingType)) { + _taskArena.reset(new custom::task_arena{concurrency}); + if (ThreadBindingType::CORES == _impl->_config._threadBindingType) { + InferenceEngine::CpuSet processMask; + int ncpus = 0; + std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask(); + if (nullptr != processMask) { + _observer.reset(new Observer{*_taskArena, + std::move(processMask), + ncpus, + _streamId, + _impl->_config._threadsPerStream, + _impl->_config._threadBindingStep, + _impl->_config._threadBindingOffset}); + _observer->observe(true); + } + } + } +#elif OV_THREAD == OV_THREAD_OMP + omp_set_num_threads(_impl->_config._threadsPerStream); + if (!checkOpenMpEnvVars(false) && (ThreadBindingType::NONE != _impl->_config._threadBindingType)) { + InferenceEngine::CpuSet processMask; + int ncpus = 0; + std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask(); + if (nullptr != processMask) { + parallel_nt(_impl->_config._threadsPerStream, [&](int threadIndex, int threadsPerStream) { + int thrIdx = _streamId * _impl->_config._threadsPerStream + threadIndex + + _impl->_config._threadBindingOffset; + InferenceEngine::PinThreadToVacantCore(thrIdx, + _impl->_config._threadBindingStep, + ncpus, + processMask); + }); + } + } +#elif OV_THREAD == OV_THREAD_SEQ + if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) { + InferenceEngine::PinCurrentThreadToSocket(_numaNodeId); + } else if (ThreadBindingType::CORES == _impl->_config._threadBindingType) { + InferenceEngine::CpuSet processMask; + int ncpus = 0; + std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask(); + if (nullptr != processMask) { + InferenceEngine::PinThreadToVacantCore(_streamId + _impl->_config._threadBindingOffset, + _impl->_config._threadBindingStep, + ncpus, + processMask); + } + } +#endif + } + ~Stream() { + { + std::lock_guard lock{_impl->_streamIdMutex}; + _impl->_streamIdQueue.push(_streamId); + } +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + if (nullptr != _observer) { + _observer->observe(false); + } +#endif + } + + Impl* _impl = nullptr; + int _streamId = 0; + int _numaNodeId = 0; + bool _execute = false; + std::queue _taskQueue; +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + std::unique_ptr _taskArena; + std::unique_ptr _observer; +#endif + }; + + explicit Impl(const Config& config) + : _config{config}, + _streams([this] { + return std::make_shared(this); + }) { + _exectorMgr = executor_manager(); + auto numaNodes = get_available_numa_nodes(); + if (_config._streams != 0) { + std::copy_n(std::begin(numaNodes), + std::min(static_cast(_config._streams), numaNodes.size()), + std::back_inserter(_usedNumaNodes)); + } else { + _usedNumaNodes = numaNodes; + } +#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO) + if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) { + const auto core_types = custom::info::core_types(); + const auto num_core_phys = get_number_of_cpu_cores(); + num_big_core_phys = get_number_of_cpu_cores(true); + const auto num_small_core_phys = num_core_phys - num_big_core_phys; + int sum = 0; + // reversed order, so BIG cores are first + for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) { + const auto& type = *iter; + // calculating the #streams per core type + const int num_streams_for_core_type = + type == 0 ? std::max(1, + std::min(config._small_core_streams, + config._threads_per_stream_small == 0 + ? 0 + : num_small_core_phys / config._threads_per_stream_small)) + : std::max(1, + std::min(config._big_core_streams, + config._threads_per_stream_big == 0 + ? 0 + : num_big_core_phys / config._threads_per_stream_big * 2)); + sum += num_streams_for_core_type; + // prefix sum, so the core type for a given stream id will be deduced just as a upper_bound + // (notice that the map keeps the elements in the descending order, so the big cores are populated + // first) + total_streams_on_core_types.push_back({type, sum}); + } + } +#endif + for (auto streamId = 0; streamId < _config._streams; ++streamId) { + _threads.emplace_back([this, streamId] { + openvino::itt::threadName(_config._name + "_" + std::to_string(streamId)); + for (bool stopped = false; !stopped;) { + Task task; + { + std::unique_lock lock(_mutex); + _queueCondVar.wait(lock, [&] { + return !_taskQueue.empty() || (stopped = _isStopped); + }); + if (!_taskQueue.empty()) { + task = std::move(_taskQueue.front()); + _taskQueue.pop(); + } + } + if (task) { + Execute(task, *(_streams.local())); + } + } + }); + } + } + + void Enqueue(Task task) { + { + std::lock_guard lock(_mutex); + _taskQueue.emplace(std::move(task)); + } + _queueCondVar.notify_one(); + } + + void Execute(const Task& task, Stream& stream) { +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + auto& arena = stream._taskArena; + if (nullptr != arena) { + arena->execute(std::move(task)); + } else { + task(); + } +#else + task(); +#endif + } + + void Defer(Task task) { + auto& stream = *(_streams.local()); + stream._taskQueue.push(std::move(task)); + if (!stream._execute) { + stream._execute = true; + try { + while (!stream._taskQueue.empty()) { + Execute(stream._taskQueue.front(), stream); + stream._taskQueue.pop(); + } + } catch (...) { + } + stream._execute = false; + } + } + + Config _config; + std::mutex _streamIdMutex; + int _streamId = 0; + std::queue _streamIdQueue; + std::vector _threads; + std::mutex _mutex; + std::condition_variable _queueCondVar; + std::queue _taskQueue; + bool _isStopped = false; + std::vector _usedNumaNodes; + InferenceEngine::ThreadLocal> _streams; +#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO) + // stream id mapping to the core type + // stored in the reversed order (so the big cores, with the highest core_type_id value, are populated first) + // every entry is the core type and #streams that this AND ALL EARLIER entries can handle (prefix sum) + // (so mapping is actually just an upper_bound: core type is deduced from the entry for which the id < #streams) + using StreamIdToCoreTypes = std::vector>; + StreamIdToCoreTypes total_streams_on_core_types; + int num_big_core_phys; +#endif + std::shared_ptr _exectorMgr; +}; + +int CPUStreamsExecutor::get_stream_id() { + auto stream = _impl->_streams.local(); + return stream->_streamId; +} + +int CPUStreamsExecutor::get_numa_node_id() { + auto stream = _impl->_streams.local(); + return stream->_numaNodeId; +} + +CPUStreamsExecutor::CPUStreamsExecutor(const ov::threading::IStreamsExecutor::Config& config) + : _impl{new Impl{config}} {} + +CPUStreamsExecutor::~CPUStreamsExecutor() { + { + std::lock_guard lock(_impl->_mutex); + _impl->_isStopped = true; + } + _impl->_queueCondVar.notify_all(); + for (auto& thread : _impl->_threads) { + if (thread.joinable()) { + thread.join(); + } + } +} + +void CPUStreamsExecutor::execute(Task task) { + _impl->Defer(std::move(task)); +} + +void CPUStreamsExecutor::run(Task task) { + if (0 == _impl->_config._streams) { + _impl->Defer(std::move(task)); + } else { + _impl->Enqueue(std::move(task)); + } +} + +} // namespace threading +} // namespace ov diff --git a/src/inference/src/dev/threading/executor_manager.cpp b/src/inference/src/dev/threading/executor_manager.cpp index 11fb7a289dd..250217b9104 100644 --- a/src/inference/src/dev/threading/executor_manager.cpp +++ b/src/inference/src/dev/threading/executor_manager.cpp @@ -6,6 +6,7 @@ #include "openvino/core/parallel.hpp" #include "openvino/runtime/properties.hpp" +#include "openvino/runtime/threading/cpu_streams_executor.hpp" #include "threading/ie_cpu_streams_executor.hpp" #if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO # if (TBB_INTERFACE_VERSION < 12000) @@ -21,13 +22,14 @@ #include namespace ov { +namespace threading { namespace { class ExecutorManagerImpl : public ExecutorManager { public: ~ExecutorManagerImpl(); - InferenceEngine::ITaskExecutor::Ptr get_executor(const std::string& id) override; - InferenceEngine::IStreamsExecutor::Ptr get_idle_cpu_streams_executor( - const InferenceEngine::IStreamsExecutor::Config& config) override; + std::shared_ptr get_executor(const std::string& id) override; + std::shared_ptr get_idle_cpu_streams_executor( + const ov::threading::IStreamsExecutor::Config& config) override; size_t get_executors_number() const override; size_t get_idle_cpu_streams_executors_number() const override; void clear(const std::string& id = {}) override; @@ -37,8 +39,8 @@ public: private: void reset_tbb(); - std::unordered_map executors; - std::vector> + std::unordered_map> executors; + std::vector>> cpuStreamsExecutors; mutable std::mutex streamExecutorMutex; mutable std::mutex taskExecutorMutex; @@ -110,12 +112,11 @@ void ExecutorManagerImpl::reset_tbb() { } } -InferenceEngine::ITaskExecutor::Ptr ExecutorManagerImpl::get_executor(const std::string& id) { +std::shared_ptr ExecutorManagerImpl::get_executor(const std::string& id) { std::lock_guard guard(taskExecutorMutex); auto foundEntry = executors.find(id); if (foundEntry == executors.end()) { - auto newExec = - std::make_shared(InferenceEngine::IStreamsExecutor::Config{id}); + auto newExec = std::make_shared(ov::threading::IStreamsExecutor::Config{id}); tbbThreadsCreated = true; executors[id] = newExec; return newExec; @@ -123,8 +124,8 @@ InferenceEngine::ITaskExecutor::Ptr ExecutorManagerImpl::get_executor(const std: return foundEntry->second; } -InferenceEngine::IStreamsExecutor::Ptr ExecutorManagerImpl::get_idle_cpu_streams_executor( - const InferenceEngine::IStreamsExecutor::Config& config) { +std::shared_ptr ExecutorManagerImpl::get_idle_cpu_streams_executor( + const ov::threading::IStreamsExecutor::Config& config) { std::lock_guard guard(streamExecutorMutex); for (const auto& it : cpuStreamsExecutors) { const auto& executor = it.second; @@ -137,12 +138,11 @@ InferenceEngine::IStreamsExecutor::Ptr ExecutorManagerImpl::get_idle_cpu_streams executorConfig._threadBindingType == config._threadBindingType && executorConfig._threadBindingStep == config._threadBindingStep && executorConfig._threadBindingOffset == config._threadBindingOffset) - if (executorConfig._threadBindingType != - InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE || + if (executorConfig._threadBindingType != ov::threading::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE || executorConfig._threadPreferredCoreType == config._threadPreferredCoreType) return executor; } - auto newExec = std::make_shared(config); + auto newExec = std::make_shared(config); tbbThreadsCreated = true; cpuStreamsExecutors.emplace_back(std::make_pair(config, newExec)); return newExec; @@ -166,13 +166,14 @@ void ExecutorManagerImpl::clear(const std::string& id) { cpuStreamsExecutors.clear(); } else { executors.erase(id); - cpuStreamsExecutors.erase(std::remove_if(cpuStreamsExecutors.begin(), - cpuStreamsExecutors.end(), - [&](const std::pair& it) { - return it.first._name == id; - }), - cpuStreamsExecutors.end()); + cpuStreamsExecutors.erase( + std::remove_if(cpuStreamsExecutors.begin(), + cpuStreamsExecutors.end(), + [&](const std::pair>& it) { + return it.first._name == id; + }), + cpuStreamsExecutors.end()); } } @@ -188,7 +189,7 @@ public: ExecutorManagerHolder() = default; - std::shared_ptr get() { + std::shared_ptr get() { std::lock_guard lock(_mutex); auto manager = _manager.lock(); if (!manager) { @@ -205,4 +206,5 @@ std::shared_ptr executor_manager() { return executorManagerHolder.get(); } +} // namespace threading } // namespace ov diff --git a/src/inference/src/dev/threading/istreams_executor.cpp b/src/inference/src/dev/threading/istreams_executor.cpp new file mode 100644 index 00000000000..d96163a2739 --- /dev/null +++ b/src/inference/src/dev/threading/istreams_executor.cpp @@ -0,0 +1,496 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +#include "openvino/runtime/threading/istreams_executor.hpp" + +#include +#include +#include +#include + +#include "cpp_interfaces/interface/ie_internal_plugin_config.hpp" +#include "ie_plugin_config.hpp" +#include "openvino/core/parallel.hpp" +#include "openvino/runtime/properties.hpp" +#include "openvino/runtime/system_conf.hpp" +#include "openvino/util/log.hpp" +#include "threading/ie_parallel_custom_arena.hpp" + +namespace ov { +namespace threading { + +IStreamsExecutor::~IStreamsExecutor() {} + +void IStreamsExecutor::Config::set_property(const std::string& key, const ov::Any& value) { + set_property({{key, value}}); +} + +void IStreamsExecutor::Config::set_property(const ov::AnyMap& property) { + for (const auto& it : property) { + const auto& key = it.first; + const auto value = it.second; + if (key == CONFIG_KEY(CPU_BIND_THREAD)) { + if (value.as() == CONFIG_VALUE(YES) || value.as() == CONFIG_VALUE(NUMA)) { +#if (defined(__APPLE__) || defined(_WIN32)) + _threadBindingType = IStreamsExecutor::ThreadBindingType::NUMA; +#else + _threadBindingType = (value.as() == CONFIG_VALUE(YES)) + ? IStreamsExecutor::ThreadBindingType::CORES + : IStreamsExecutor::ThreadBindingType::NUMA; +#endif + } else if (value.as() == CONFIG_VALUE(HYBRID_AWARE)) { + _threadBindingType = IStreamsExecutor::ThreadBindingType::HYBRID_AWARE; + } else if (value.as() == CONFIG_VALUE(NO)) { + _threadBindingType = IStreamsExecutor::ThreadBindingType::NONE; + } else { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_BIND_THREAD) + << ". Expected only YES(binds to cores) / NO(no binding) / NUMA(binds to NUMA nodes) / " + "HYBRID_AWARE (let the runtime recognize and use the hybrid cores)"; + } + } else if (key == ov::affinity) { + ov::Affinity affinity; + std::stringstream{value.as()} >> affinity; + switch (affinity) { + case ov::Affinity::NONE: + _threadBindingType = ThreadBindingType::NONE; + break; + case ov::Affinity::CORE: { +#if (defined(__APPLE__) || defined(_WIN32)) + _threadBindingType = ThreadBindingType::NUMA; +#else + _threadBindingType = ThreadBindingType::CORES; +#endif + } break; + case ov::Affinity::NUMA: + _threadBindingType = ThreadBindingType::NUMA; + break; + case ov::Affinity::HYBRID_AWARE: + _threadBindingType = ThreadBindingType::HYBRID_AWARE; + break; + default: + OPENVINO_UNREACHABLE("Unsupported affinity type"); + } + } else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) { + if (value.as() == CONFIG_VALUE(CPU_THROUGHPUT_NUMA)) { + _streams = static_cast(get_available_numa_nodes().size()); + } else if (value.as() == CONFIG_VALUE(CPU_THROUGHPUT_AUTO)) { + // bare minimum of streams (that evenly divides available number of cores) + _streams = get_default_num_streams(); + } else { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS) + << ". Expected only positive numbers (#streams) or " + << "PluginConfigParams::CPU_THROUGHPUT_NUMA/CPU_THROUGHPUT_AUTO"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS) + << ". Expected only positive numbers (#streams)"; + } + _streams = val_i; + } + } else if (key == ov::num_streams) { + auto streams = value.as(); + if (streams == ov::streams::NUMA) { + _streams = static_cast(get_available_numa_nodes().size()); + } else if (streams == ov::streams::AUTO) { + // bare minimum of streams (that evenly divides available number of cores) + _streams = get_default_num_streams(); + } else if (streams.num >= 0) { + _streams = streams.num; + } else { + OPENVINO_UNREACHABLE("Wrong value for property key ", + ov::num_streams.name(), + ". Expected non negative numbers (#streams) or ", + "ov::streams::NUMA|ov::streams::AUTO, Got: ", + streams); + } + } else if (key == CONFIG_KEY(CPU_THREADS_NUM) || key == ov::inference_num_threads) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM) + << ". Expected only positive numbers (#threads)"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM) + << ". Expected only positive numbers (#threads)"; + } + _threads = val_i; + } else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM) + << ". Expected only non negative numbers (#threads)"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM) + << ". Expected only non negative numbers (#threads)"; + } + _threadsPerStream = val_i; + } else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS) + << ". Expected only non negative numbers (#streams)"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS) + << ". Expected only non negative numbers (#streams)"; + } + _big_core_streams = val_i; + } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS) + << ". Expected only non negative numbers (#streams)"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS) + << ". Expected only non negative numbers (#streams)"; + } + _small_core_streams = val_i; + } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG) + << ". Expected only non negative numbers (#threads)"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG) + << ". Expected only non negative numbers (#threads)"; + } + _threads_per_stream_big = val_i; + } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL) + << ". Expected only non negative numbers (#threads)"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL) + << ". Expected only non negative numbers (#threads)"; + } + _threads_per_stream_small = val_i; + } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) { + int val_i; + try { + val_i = value.as(); + } catch (const std::exception&) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET) + << ". Expected only non negative numbers"; + } + if (val_i < 0) { + IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET) + << ". Expected only non negative numbers"; + } + _small_core_offset = val_i; + } else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) { + if (value.as() == CONFIG_VALUE(YES)) { + _enable_hyper_thread = true; + } else if (value.as() == CONFIG_VALUE(NO)) { + _enable_hyper_thread = false; + } else { + OPENVINO_UNREACHABLE("Unsupported enable hyper thread type"); + } + } else { + IE_THROW() << "Wrong value for property key " << key; + } + } +} + +ov::Any IStreamsExecutor::Config::get_property(const std::string& key) const { + if (key == ov::supported_properties) { + std::vector properties{ + CONFIG_KEY(CPU_THROUGHPUT_STREAMS), + CONFIG_KEY(CPU_BIND_THREAD), + CONFIG_KEY(CPU_THREADS_NUM), + CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM), + CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS), + CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS), + CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG), + CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL), + CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET), + CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD), + ov::num_streams.name(), + ov::inference_num_threads.name(), + ov::affinity.name(), + }; + return properties; + } else if (key == ov::affinity) { + switch (_threadBindingType) { + case IStreamsExecutor::ThreadBindingType::NONE: + return ov::Affinity::NONE; + case IStreamsExecutor::ThreadBindingType::CORES: + return ov::Affinity::CORE; + case IStreamsExecutor::ThreadBindingType::NUMA: + return ov::Affinity::NUMA; + case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE: + return ov::Affinity::HYBRID_AWARE; + } + } else if (key == CONFIG_KEY(CPU_BIND_THREAD)) { + switch (_threadBindingType) { + case IStreamsExecutor::ThreadBindingType::NONE: + return {CONFIG_VALUE(NO)}; + case IStreamsExecutor::ThreadBindingType::CORES: + return {CONFIG_VALUE(YES)}; + case IStreamsExecutor::ThreadBindingType::NUMA: + return {CONFIG_VALUE(NUMA)}; + case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE: + return {CONFIG_VALUE(HYBRID_AWARE)}; + } + } else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) { + return {std::to_string(_streams)}; + } else if (key == ov::num_streams) { + return decltype(ov::num_streams)::value_type{_streams}; + } else if (key == CONFIG_KEY(CPU_THREADS_NUM)) { + return {std::to_string(_threads)}; + } else if (key == ov::inference_num_threads) { + return decltype(ov::inference_num_threads)::value_type{_threads}; + } else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) { + return {std::to_string(_threadsPerStream)}; + } else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) { + return {std::to_string(_big_core_streams)}; + } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) { + return {std::to_string(_small_core_streams)}; + } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) { + return {std::to_string(_threads_per_stream_big)}; + } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) { + return {std::to_string(_threads_per_stream_small)}; + } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) { + return {std::to_string(_small_core_offset)}; + } else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) { + return {_enable_hyper_thread ? CONFIG_VALUE(YES) : CONFIG_VALUE(NO)}; + } else { + OPENVINO_UNREACHABLE("Wrong value for property key ", key); + } + return {}; +} + +int IStreamsExecutor::Config::get_default_num_streams(const bool enable_hyper_thread) { + const int sockets = static_cast(get_available_numa_nodes().size()); + // bare minimum of streams (that evenly divides available number of core) + const int num_cores = sockets == 1 ? (enable_hyper_thread ? parallel_get_max_threads() : get_number_of_cpu_cores()) + : get_number_of_cpu_cores(); + if (0 == num_cores % 4) + return std::max(4, num_cores / 4); + else if (0 == num_cores % 5) + return std::max(5, num_cores / 5); + else if (0 == num_cores % 3) + return std::max(3, num_cores / 3); + else // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide + return 1; +} + +int IStreamsExecutor::Config::get_hybrid_num_streams(std::map& config, + const int stream_mode) { + const int num_cores = parallel_get_max_threads(); + const int num_cores_phy = get_number_of_cpu_cores(); + const int num_big_cores_phy = get_number_of_cpu_cores(true); + const int num_small_cores = num_cores_phy - num_big_cores_phy; + const int num_big_cores = num_cores > num_cores_phy ? num_big_cores_phy * 2 : num_big_cores_phy; + int big_core_streams = 0; + int small_core_streams = 0; + int threads_per_stream_big = 0; + int threads_per_stream_small = 0; + + if (stream_mode == DEFAULT) { + // bare minimum of streams (that evenly divides available number of core) + if (0 == num_big_cores_phy % 4) { + threads_per_stream_big = 4; + } else if (0 == num_big_cores_phy % 5) { + threads_per_stream_big = 5; + } else if (0 == num_big_cores_phy % 3) { + threads_per_stream_big = 3; + } else { // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide + threads_per_stream_big = num_big_cores_phy; + } + + big_core_streams = num_big_cores / threads_per_stream_big; + threads_per_stream_small = threads_per_stream_big; + if (num_small_cores == 0) { + threads_per_stream_small = 0; + } else if (num_small_cores < threads_per_stream_small) { + small_core_streams = 1; + threads_per_stream_small = num_small_cores; + threads_per_stream_big = threads_per_stream_small; + // Balance the computation of physical core and logical core, the number of threads on the physical core and + // logical core should be equal + big_core_streams = num_big_cores_phy / threads_per_stream_big * 2; + } else { + small_core_streams = num_small_cores / threads_per_stream_small; + } + } else if (stream_mode == AGGRESSIVE) { + big_core_streams = num_big_cores; + small_core_streams = num_small_cores; + threads_per_stream_big = num_big_cores / big_core_streams; + threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams; + } else if (stream_mode == LESSAGGRESSIVE) { + big_core_streams = num_big_cores / 2; + small_core_streams = num_small_cores / 2; + threads_per_stream_big = num_big_cores / big_core_streams; + threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams; + } else { + IE_THROW() << "Wrong stream mode to get num of streams: " << stream_mode; + } + config[CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)] = std::to_string(big_core_streams); + config[CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)] = std::to_string(small_core_streams); + config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)] = std::to_string(threads_per_stream_big); + config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)] = std::to_string(threads_per_stream_small); + // This is default setting for specific CPU which Pcore is in front and Ecore is in the back. + config[CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)] = std::to_string(num_small_cores == 0 ? 0 : num_big_cores); + return big_core_streams + small_core_streams; +} + +void IStreamsExecutor::Config::update_hybrid_custom_threads(Config& config) { + const auto num_cores = parallel_get_max_threads(); + const auto num_cores_phys = get_number_of_cpu_cores(); + const auto num_big_cores_phys = get_number_of_cpu_cores(true); + const auto num_big_cores = num_cores > num_cores_phys ? num_big_cores_phys * 2 : num_big_cores_phys; + const auto num_small_cores_phys = num_cores_phys - num_big_cores_phys; + const auto threads = config._threads ? config._threads : num_cores; + const auto streams = config._streams > 0 ? config._streams : 1; + + config._small_core_offset = num_big_cores; + int threads_per_stream = std::max(1, threads / streams); + + if ((num_big_cores_phys / threads_per_stream >= streams) && (1 < threads_per_stream)) { + config._big_core_streams = streams; + config._threads_per_stream_big = threads_per_stream; + config._small_core_streams = 0; + config._threads_per_stream_small = 0; + } else if ((num_small_cores_phys / threads_per_stream >= streams) && (num_big_cores_phys < threads_per_stream)) { + config._big_core_streams = 0; + config._threads_per_stream_big = 0; + config._small_core_streams = streams; + config._threads_per_stream_small = threads_per_stream; + } else { + const int threads_per_stream_big = std::min(num_big_cores_phys, threads_per_stream); + const int threads_per_stream_small = std::min(num_small_cores_phys, threads_per_stream); + + threads_per_stream = std::min(threads_per_stream_big, threads_per_stream_small); + while (threads_per_stream > 1) { + const int base_big_streams = num_big_cores_phys / threads_per_stream; + const int base_small_streams = num_small_cores_phys > 0 ? num_small_cores_phys / threads_per_stream : 0; + if (base_big_streams + base_small_streams >= streams) { + config._big_core_streams = base_big_streams; + config._small_core_streams = streams - base_big_streams; + break; + } else if (base_big_streams * 2 + base_small_streams >= streams) { + config._big_core_streams = streams - base_small_streams; + config._small_core_streams = base_small_streams; + break; + } else { + threads_per_stream = threads_per_stream > 1 ? threads_per_stream - 1 : 1; + } + } + + if (threads_per_stream == 1) { + const int stream_loops = streams / num_cores; + const int remain_streams = streams - stream_loops * num_cores; + if (num_big_cores_phys >= remain_streams) { + config._big_core_streams = remain_streams + num_big_cores * stream_loops; + config._small_core_streams = num_small_cores_phys * stream_loops; + } else if (num_big_cores_phys + num_small_cores_phys >= remain_streams) { + config._big_core_streams = num_big_cores_phys + num_big_cores * stream_loops; + config._small_core_streams = remain_streams - num_big_cores_phys + num_small_cores_phys * stream_loops; + } else { + config._big_core_streams = remain_streams - num_small_cores_phys + num_big_cores * stream_loops; + config._small_core_streams = num_small_cores_phys * (stream_loops + 1); + } + } + + config._threads_per_stream_big = threads_per_stream; + config._threads_per_stream_small = threads_per_stream; + } +} + +IStreamsExecutor::Config IStreamsExecutor::Config::make_default_multi_threaded(const IStreamsExecutor::Config& initial, + const bool fp_intesive) { + const auto envThreads = parallel_get_env_threads(); + const auto& numaNodes = get_available_numa_nodes(); + const int numaNodesNum = static_cast(numaNodes.size()); + auto streamExecutorConfig = initial; + const bool bLatencyCase = streamExecutorConfig._streams <= numaNodesNum; + + // by default, do not use the hyper-threading (to minimize threads synch overheads) + int num_cores_default = get_number_of_cpu_cores(); +#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO) + // additional latency-case logic for hybrid processors: + if (ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType) { + const auto core_types = custom::info::core_types(); + const auto num_little_cores = + custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.front())); + const auto num_big_cores_phys = get_number_of_cpu_cores(true); + const int int8_threshold = 4; // ~relative efficiency of the VNNI-intensive code for Big vs Little cores; + const int fp32_threshold = 2; // ~relative efficiency of the AVX2 fp32 code for Big vs Little cores; + // by default the latency case uses (faster) Big cores only, depending on the compute ratio + const bool bLatencyCaseBigOnly = + num_big_cores_phys > (num_little_cores / (fp_intesive ? fp32_threshold : int8_threshold)); + // selecting the preferred core type + streamExecutorConfig._threadPreferredCoreType = + bLatencyCase ? (bLatencyCaseBigOnly ? IStreamsExecutor::Config::PreferredCoreType::BIG + : IStreamsExecutor::Config::PreferredCoreType::ANY) + : IStreamsExecutor::Config::PreferredCoreType::ROUND_ROBIN; + // additionally selecting the #cores to use in the "Big-only" case + if (bLatencyCaseBigOnly) { + const int hyper_threading_threshold = + 2; // min #cores, for which the hyper-threading becomes useful for the latency case + const auto num_big_cores = + custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.back())); + num_cores_default = (num_big_cores_phys <= hyper_threading_threshold) ? num_big_cores : num_big_cores_phys; + } + // if nstreams or nthreads are set, need to calculate the Hybrid aware parameters here + if (!bLatencyCase && (streamExecutorConfig._big_core_streams == 0 || streamExecutorConfig._threads)) { + update_hybrid_custom_threads(streamExecutorConfig); + } + OPENVINO_DEBUG << "[ p_e_core_info ] streams (threads): " << streamExecutorConfig._streams << "(" + << streamExecutorConfig._threads_per_stream_big * streamExecutorConfig._big_core_streams + + streamExecutorConfig._threads_per_stream_small * streamExecutorConfig._small_core_streams + << ") -- PCore: " << streamExecutorConfig._big_core_streams << "(" + << streamExecutorConfig._threads_per_stream_big + << ") ECore: " << streamExecutorConfig._small_core_streams << "(" + << streamExecutorConfig._threads_per_stream_small << ")"; + } +#endif + const auto hwCores = + !bLatencyCase && numaNodesNum == 1 + // throughput case on a single-NUMA node machine uses all available cores + ? (streamExecutorConfig._enable_hyper_thread ? parallel_get_max_threads() : num_cores_default) + // in the rest of cases: + // multi-node machine + // or + // latency case, single-node yet hybrid case that uses + // all core types + // or + // big-cores only, but the #cores is "enough" (pls see the logic above) + // it is usually beneficial not to use the hyper-threading (which is default) + : num_cores_default; + const auto threads = + streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores); + streamExecutorConfig._threadsPerStream = + streamExecutorConfig._streams ? std::max(1, threads / streamExecutorConfig._streams) : threads; + streamExecutorConfig._threads = + (!bLatencyCase && ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType) + ? streamExecutorConfig._big_core_streams * streamExecutorConfig._threads_per_stream_big + + streamExecutorConfig._small_core_streams * streamExecutorConfig._threads_per_stream_small + : streamExecutorConfig._threadsPerStream * streamExecutorConfig._streams; + return streamExecutorConfig; +} + +} // namespace threading +} // namespace ov diff --git a/src/inference/src/dev/threading/itask_executor.cpp b/src/inference/src/dev/threading/itask_executor.cpp new file mode 100644 index 00000000000..7701df3d2b4 --- /dev/null +++ b/src/inference/src/dev/threading/itask_executor.cpp @@ -0,0 +1,41 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +#include "openvino/runtime/threading/itask_executor.hpp" + +#include +#include +#include +#include + +namespace ov { +namespace threading { + +void ITaskExecutor::run_and_wait(const std::vector& tasks) { + std::vector> packagedTasks; + std::vector> futures; + for (std::size_t i = 0; i < tasks.size(); ++i) { + packagedTasks.emplace_back([&tasks, i] { + tasks[i](); + }); + futures.emplace_back(packagedTasks.back().get_future()); + } + for (std::size_t i = 0; i < tasks.size(); ++i) { + run([&packagedTasks, i] { + packagedTasks[i](); + }); + } + // std::future::get will rethrow exception from task. + // We should wait all tasks before any exception is thrown. + // So wait() and get() for each future moved to separate loops + for (auto&& future : futures) { + future.wait(); + } + for (auto&& future : futures) { + future.get(); + } +} + +} // namespace threading +} // namespace ov diff --git a/src/inference/src/os/lin/lin_system_conf.cpp b/src/inference/src/os/lin/lin_system_conf.cpp index d822b631e9c..ec56b4897d5 100644 --- a/src/inference/src/os/lin/lin_system_conf.cpp +++ b/src/inference/src/os/lin/lin_system_conf.cpp @@ -18,7 +18,7 @@ #include "streams_executor.hpp" #include "threading/ie_parallel_custom_arena.hpp" -namespace InferenceEngine { +namespace ov { struct CPU { int _processors = 0; @@ -243,13 +243,13 @@ void parse_processor_info_linux(const int _processors, }; #if !((IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)) -std::vector getAvailableNUMANodes() { +std::vector get_available_numa_nodes() { std::vector nodes((0 == cpu._sockets) ? 1 : cpu._sockets); std::iota(std::begin(nodes), std::end(nodes), 0); return nodes; } #endif -int getNumberOfCPUCores(bool bigCoresOnly) { +int get_number_of_cpu_cores(bool bigCoresOnly) { unsigned numberOfProcessors = cpu._processors; unsigned totalNumberOfCpuCores = cpu._cores; IE_ASSERT(totalNumberOfCpuCores != 0); @@ -280,4 +280,4 @@ int getNumberOfCPUCores(bool bigCoresOnly) { return phys_cores; } -} // namespace InferenceEngine +} // namespace ov diff --git a/src/inference/src/os/win/win_system_conf.cpp b/src/inference/src/os/win/win_system_conf.cpp index e89666edf7a..e4d7df01667 100644 --- a/src/inference/src/os/win/win_system_conf.cpp +++ b/src/inference/src/os/win/win_system_conf.cpp @@ -3,7 +3,7 @@ // #ifndef NOMINMAX -# define NOMINMAX +# define NOMINMAX #endif #include @@ -11,11 +11,11 @@ #include #include -#include "ie_system_conf.h" +#include "openvino/runtime/system_conf.hpp" #include "streams_executor.hpp" #include "threading/ie_parallel_custom_arena.hpp" -namespace InferenceEngine { +namespace ov { struct CPU { int _processors = 0; @@ -168,7 +168,7 @@ void parse_processor_info_win(const char* base_ptr, } } -int getNumberOfCPUCores(bool bigCoresOnly) { +int get_number_of_cpu_cores(bool bigCoresOnly) { const int fallback_val = parallel_get_max_threads(); DWORD sz = 0; // querying the size of the resulting structure, passing the nullptr for the buffer @@ -178,7 +178,8 @@ int getNumberOfCPUCores(bool bigCoresOnly) { std::unique_ptr ptr(new uint8_t[sz]); if (!GetLogicalProcessorInformationEx(RelationProcessorCore, - reinterpret_cast(ptr.get()), &sz)) + reinterpret_cast(ptr.get()), + &sz)) return fallback_val; int phys_cores = 0; @@ -188,20 +189,21 @@ int getNumberOfCPUCores(bool bigCoresOnly) { phys_cores++; } while (offset < sz); - #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) +#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) auto core_types = custom::info::core_types(); if (bigCoresOnly && core_types.size() > 1) /*Hybrid CPU*/ { - phys_cores = custom::info::default_concurrency(custom::task_arena::constraints{} - .set_core_type(core_types.back()) - .set_max_threads_per_core(1)); + phys_cores = custom::info::default_concurrency( + custom::task_arena::constraints{}.set_core_type(core_types.back()).set_max_threads_per_core(1)); } - #endif +#endif return phys_cores; } #if !(IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) // OMP/SEQ threading on the Windows doesn't support NUMA -std::vector getAvailableNUMANodes() { return {-1}; } +std::vector get_available_numa_nodes() { + return {-1}; +} #endif -} // namespace InferenceEngine +} // namespace ov diff --git a/src/inference/src/streams_executor.hpp b/src/inference/src/streams_executor.hpp index 769c4ec73cd..4bea102dbce 100644 --- a/src/inference/src/streams_executor.hpp +++ b/src/inference/src/streams_executor.hpp @@ -11,7 +11,7 @@ #include #include -namespace InferenceEngine { +namespace ov { #ifdef __linux__ /** @@ -55,4 +55,4 @@ void parse_processor_info_win(const char* base_ptr, std::vector>& _cpu_mapping_table); #endif -} // namespace InferenceEngine \ No newline at end of file +} // namespace ov diff --git a/src/inference/src/ie_system_conf.cpp b/src/inference/src/system_conf.cpp similarity index 90% rename from src/inference/src/ie_system_conf.cpp rename to src/inference/src/system_conf.cpp index 761fdda4dd5..da212d4a629 100644 --- a/src/inference/src/ie_system_conf.cpp +++ b/src/inference/src/system_conf.cpp @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 // -#include "ie_system_conf.h" +#include "openvino/runtime/system_conf.hpp" #include #include @@ -15,7 +15,7 @@ #define XBYAK_UNDEF_JNL #include -namespace InferenceEngine { +namespace ov { #if defined(OPENVINO_ARCH_X86) || defined(OPENVINO_ARCH_X86_64) @@ -102,7 +102,7 @@ bool with_cpu_x86_avx512_core_amx() { #endif // OPENVINO_ARCH_X86 || OPENVINO_ARCH_X86_64 -bool checkOpenMpEnvVars(bool includeOMPNumThreads) { +bool check_open_mp_env_vars(bool include_omp_num_threads) { for (auto&& var : {"GOMP_CPU_AFFINITY", "GOMP_DEBUG" "GOMP_RTEMS_THREAD_POOLS", @@ -134,7 +134,7 @@ bool checkOpenMpEnvVars(bool includeOMPNumThreads) { "PHI_KMP_PLACE_THREADS" "PHI_OMP_NUM_THREADS"}) { if (getenv(var)) { - if (0 != strcmp(var, "OMP_NUM_THREADS") || includeOMPNumThreads) + if (0 != strcmp(var, "OMP_NUM_THREADS") || include_omp_num_threads) return true; } } @@ -144,19 +144,19 @@ bool checkOpenMpEnvVars(bool includeOMPNumThreads) { #if defined(__APPLE__) || defined(__EMSCRIPTEN__) // for Linux and Windows the getNumberOfCPUCores (that accounts only for physical cores) implementation is OS-specific // (see cpp files in corresponding folders), for __APPLE__ it is default : -int getNumberOfCPUCores(bool) { +int get_number_of_cpu_cores(bool) { return parallel_get_max_threads(); } # if !((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -std::vector getAvailableNUMANodes() { +std::vector get_available_numa_nodes() { return {-1}; } # endif -int getNumberOfLogicalCPUCores(bool) { +int get_number_of_logical_cpu_cores(bool) { return parallel_get_max_threads(); } #else -int getNumberOfLogicalCPUCores(bool bigCoresOnly) { +int get_number_of_logical_cpu_cores(bool bigCoresOnly) { int logical_cores = parallel_get_max_threads(); # if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) auto core_types = custom::info::core_types(); @@ -170,18 +170,18 @@ int getNumberOfLogicalCPUCores(bool bigCoresOnly) { #endif #if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -std::vector getAvailableNUMANodes() { +std::vector get_available_numa_nodes() { return custom::info::numa_nodes(); } // this is impl only with the TBB -std::vector getAvailableCoresTypes() { +std::vector get_available_cores_types() { return custom::info::core_types(); } #else // as the core types support exists only with the TBB, the fallback is same for any other threading API -std::vector getAvailableCoresTypes() { +std::vector get_available_cores_types() { return {-1}; } #endif -} // namespace InferenceEngine +} // namespace ov diff --git a/src/inference/src/threading/ie_cpu_streams_executor.cpp b/src/inference/src/threading/ie_cpu_streams_executor.cpp index 2e786599a74..37f690ec473 100644 --- a/src/inference/src/threading/ie_cpu_streams_executor.cpp +++ b/src/inference/src/threading/ie_cpu_streams_executor.cpp @@ -194,7 +194,7 @@ struct CPUStreamsExecutor::Impl { } #elif IE_THREAD == IE_THREAD_SEQ if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) { - PinCurrentThreadToSocket(_numaNodeId); + InferenceEngine::PinCurrentThreadToSocket(_numaNodeId); } else if (ThreadBindingType::CORES == _impl->_config._threadBindingType) { CpuSet processMask; int ncpus = 0; @@ -368,7 +368,7 @@ int CPUStreamsExecutor::GetNumaNodeId() { return stream->_numaNodeId; } -CPUStreamsExecutor::CPUStreamsExecutor(const IStreamsExecutor::Config& config) : _impl{new Impl{config}} {} +CPUStreamsExecutor::CPUStreamsExecutor(const Config& config) : _impl{new Impl{config}} {} CPUStreamsExecutor::~CPUStreamsExecutor() { { diff --git a/src/inference/src/threading/ie_executor_manager.cpp b/src/inference/src/threading/ie_executor_manager.cpp index 7ea0e16f3cd..82a1e126ae5 100644 --- a/src/inference/src/threading/ie_executor_manager.cpp +++ b/src/inference/src/threading/ie_executor_manager.cpp @@ -7,7 +7,10 @@ #include "ie_parallel.hpp" #include "openvino/runtime/properties.hpp" #include "openvino/runtime/threading/executor_manager.hpp" +#include "openvino/runtime/threading/istreams_executor.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" #include "threading/ie_cpu_streams_executor.hpp" +#include "threading/ie_itask_executor.hpp" #if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO # if (TBB_INTERFACE_VERSION < 12000) # include @@ -25,7 +28,7 @@ namespace InferenceEngine { namespace { class ExecutorManagerImpl : public ExecutorManager { public: - ExecutorManagerImpl(const std::shared_ptr& manager); + ExecutorManagerImpl(const std::shared_ptr& manager); ITaskExecutor::Ptr getExecutor(const std::string& id) override; IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) override; size_t getExecutorsNumber() const override; @@ -35,15 +38,55 @@ public: bool getTbbFlag() override; private: - std::shared_ptr m_manager; - std::shared_ptr get_ov_manager() const override { + std::shared_ptr m_manager; + std::shared_ptr get_ov_manager() const override { return m_manager; } }; +class TaskExecutorWrapper : public ITaskExecutor { + std::shared_ptr m_executor; + +public: + TaskExecutorWrapper(const std::shared_ptr& executor) : m_executor(executor) {} + void run(Task task) override { + m_executor->run(task); + } + + void runAndWait(const std::vector& tasks) override { + m_executor->run_and_wait(tasks); + } +}; + +class StreamsExecutorWrapper : public IStreamsExecutor { + std::shared_ptr m_executor; + +public: + StreamsExecutorWrapper(const std::shared_ptr& executor) : m_executor(executor) {} + void run(Task task) override { + m_executor->run(task); + } + + void runAndWait(const std::vector& tasks) override { + m_executor->run_and_wait(tasks); + } + int GetStreamId() override { + return m_executor->get_stream_id(); + } + + int GetNumaNodeId() override { + return m_executor->get_numa_node_id(); + } + + void Execute(Task task) override { + m_executor->execute(task); + } +}; + } // namespace -ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr& manager) : m_manager(manager) {} +ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr& manager) + : m_manager(manager) {} void ExecutorManagerImpl::setTbbFlag(bool flag) { m_manager->set_property({{ov::force_tbb_terminate.name(), flag}}); @@ -54,11 +97,11 @@ bool ExecutorManagerImpl::getTbbFlag() { } ITaskExecutor::Ptr ExecutorManagerImpl::getExecutor(const std::string& id) { - return m_manager->get_executor(id); + return std::make_shared(m_manager->get_executor(id)); } IStreamsExecutor::Ptr ExecutorManagerImpl::getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) { - return m_manager->get_idle_cpu_streams_executor(config); + return std::make_shared(m_manager->get_idle_cpu_streams_executor(config)); } size_t ExecutorManagerImpl::getExecutorsNumber() const { @@ -74,7 +117,7 @@ void ExecutorManagerImpl::clear(const std::string& id) { } std::shared_ptr create_old_manager( - const std::shared_ptr& manager) { + const std::shared_ptr& manager) { return std::make_shared(manager); } @@ -94,7 +137,7 @@ public: std::lock_guard lock(_mutex); auto manager = _manager.lock(); if (!manager) { - _manager = manager = create_old_manager(ov::executor_manager()); + _manager = manager = create_old_manager(ov::threading::executor_manager()); } return manager; } diff --git a/src/inference/src/threading/ie_istreams_executor.cpp b/src/inference/src/threading/ie_istreams_executor.cpp index 87529594c45..e78cc8cb0fa 100644 --- a/src/inference/src/threading/ie_istreams_executor.cpp +++ b/src/inference/src/threading/ie_istreams_executor.cpp @@ -23,463 +23,31 @@ namespace InferenceEngine { IStreamsExecutor::~IStreamsExecutor() {} std::vector IStreamsExecutor::Config::SupportedKeys() const { - return { - CONFIG_KEY(CPU_THROUGHPUT_STREAMS), - CONFIG_KEY(CPU_BIND_THREAD), - CONFIG_KEY(CPU_THREADS_NUM), - CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM), - CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS), - CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS), - CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG), - CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL), - CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET), - CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD), - ov::num_streams.name(), - ov::inference_num_threads.name(), - ov::affinity.name(), - }; + return get_property(ov::supported_properties.name()).as>(); } int IStreamsExecutor::Config::GetDefaultNumStreams(const bool enable_hyper_thread) { - const int sockets = static_cast(getAvailableNUMANodes().size()); - // bare minimum of streams (that evenly divides available number of core) - const int num_cores = sockets == 1 ? (enable_hyper_thread ? parallel_get_max_threads() : getNumberOfCPUCores()) - : getNumberOfCPUCores(); - if (0 == num_cores % 4) - return std::max(4, num_cores / 4); - else if (0 == num_cores % 5) - return std::max(5, num_cores / 5); - else if (0 == num_cores % 3) - return std::max(3, num_cores / 3); - else // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide - return 1; + return get_default_num_streams(enable_hyper_thread); } int IStreamsExecutor::Config::GetHybridNumStreams(std::map& config, const int stream_mode) { - const int num_cores = parallel_get_max_threads(); - const int num_cores_phy = getNumberOfCPUCores(); - const int num_big_cores_phy = getNumberOfCPUCores(true); - const int num_small_cores = num_cores_phy - num_big_cores_phy; - const int num_big_cores = num_cores > num_cores_phy ? num_big_cores_phy * 2 : num_big_cores_phy; - int big_core_streams = 0; - int small_core_streams = 0; - int threads_per_stream_big = 0; - int threads_per_stream_small = 0; - - if (stream_mode == DEFAULT) { - // bare minimum of streams (that evenly divides available number of core) - if (0 == num_big_cores_phy % 4) { - threads_per_stream_big = 4; - } else if (0 == num_big_cores_phy % 5) { - threads_per_stream_big = 5; - } else if (0 == num_big_cores_phy % 3) { - threads_per_stream_big = 3; - } else { // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide - threads_per_stream_big = num_big_cores_phy; - } - - big_core_streams = num_big_cores / threads_per_stream_big; - threads_per_stream_small = threads_per_stream_big; - if (num_small_cores == 0) { - threads_per_stream_small = 0; - } else if (num_small_cores < threads_per_stream_small) { - small_core_streams = 1; - threads_per_stream_small = num_small_cores; - threads_per_stream_big = threads_per_stream_small; - // Balance the computation of physical core and logical core, the number of threads on the physical core and - // logical core should be equal - big_core_streams = num_big_cores_phy / threads_per_stream_big * 2; - } else { - small_core_streams = num_small_cores / threads_per_stream_small; - } - } else if (stream_mode == AGGRESSIVE) { - big_core_streams = num_big_cores; - small_core_streams = num_small_cores; - threads_per_stream_big = num_big_cores / big_core_streams; - threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams; - } else if (stream_mode == LESSAGGRESSIVE) { - big_core_streams = num_big_cores / 2; - small_core_streams = num_small_cores / 2; - threads_per_stream_big = num_big_cores / big_core_streams; - threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams; - } else { - IE_THROW() << "Wrong stream mode to get num of streams: " << stream_mode; - } - config[CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)] = std::to_string(big_core_streams); - config[CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)] = std::to_string(small_core_streams); - config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)] = std::to_string(threads_per_stream_big); - config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)] = std::to_string(threads_per_stream_small); - // This is default setting for specific CPU which Pcore is in front and Ecore is in the back. - config[CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)] = std::to_string(num_small_cores == 0 ? 0 : num_big_cores); - return big_core_streams + small_core_streams; + return get_hybrid_num_streams(config, stream_mode); } void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::string& value) { - if (key == CONFIG_KEY(CPU_BIND_THREAD)) { - if (value == CONFIG_VALUE(YES) || value == CONFIG_VALUE(NUMA)) { -#if (defined(__APPLE__) || defined(_WIN32)) - _threadBindingType = IStreamsExecutor::ThreadBindingType::NUMA; -#else - _threadBindingType = (value == CONFIG_VALUE(YES)) ? IStreamsExecutor::ThreadBindingType::CORES - : IStreamsExecutor::ThreadBindingType::NUMA; -#endif - } else if (value == CONFIG_VALUE(HYBRID_AWARE)) { - _threadBindingType = IStreamsExecutor::ThreadBindingType::HYBRID_AWARE; - } else if (value == CONFIG_VALUE(NO)) { - _threadBindingType = IStreamsExecutor::ThreadBindingType::NONE; - } else { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_BIND_THREAD) - << ". Expected only YES(binds to cores) / NO(no binding) / NUMA(binds to NUMA nodes) / " - "HYBRID_AWARE (let the runtime recognize and use the hybrid cores)"; - } - } else if (key == ov::affinity) { - ov::Affinity affinity; - std::stringstream{value} >> affinity; - switch (affinity) { - case ov::Affinity::NONE: - _threadBindingType = ThreadBindingType::NONE; - break; - case ov::Affinity::CORE: { -#if (defined(__APPLE__) || defined(_WIN32)) - _threadBindingType = ThreadBindingType::NUMA; -#else - _threadBindingType = ThreadBindingType::CORES; -#endif - } break; - case ov::Affinity::NUMA: - _threadBindingType = ThreadBindingType::NUMA; - break; - case ov::Affinity::HYBRID_AWARE: - _threadBindingType = ThreadBindingType::HYBRID_AWARE; - break; - default: - OPENVINO_UNREACHABLE("Unsupported affinity type"); - } - } else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) { - if (value == CONFIG_VALUE(CPU_THROUGHPUT_NUMA)) { - _streams = static_cast(getAvailableNUMANodes().size()); - } else if (value == CONFIG_VALUE(CPU_THROUGHPUT_AUTO)) { - // bare minimum of streams (that evenly divides available number of cores) - _streams = GetDefaultNumStreams(); - } else { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS) - << ". Expected only positive numbers (#streams) or " - << "PluginConfigParams::CPU_THROUGHPUT_NUMA/CPU_THROUGHPUT_AUTO"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS) - << ". Expected only positive numbers (#streams)"; - } - _streams = val_i; - } - } else if (key == ov::num_streams) { - auto streams = ov::util::from_string(value, ov::streams::num); - if (streams == ov::streams::NUMA) { - _streams = static_cast(getAvailableNUMANodes().size()); - } else if (streams == ov::streams::AUTO) { - // bare minimum of streams (that evenly divides available number of cores) - _streams = GetDefaultNumStreams(); - } else if (streams.num >= 0) { - _streams = streams.num; - } else { - OPENVINO_UNREACHABLE("Wrong value for property key ", - ov::num_streams.name(), - ". Expected non negative numbers (#streams) or ", - "ov::streams::NUMA|ov::streams::AUTO, Got: ", - streams); - } - } else if (key == CONFIG_KEY(CPU_THREADS_NUM) || key == ov::inference_num_threads) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM) - << ". Expected only positive numbers (#threads)"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM) - << ". Expected only positive numbers (#threads)"; - } - _threads = val_i; - } else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM) - << ". Expected only non negative numbers (#threads)"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM) - << ". Expected only non negative numbers (#threads)"; - } - _threadsPerStream = val_i; - } else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS) - << ". Expected only non negative numbers (#streams)"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS) - << ". Expected only non negative numbers (#streams)"; - } - _big_core_streams = val_i; - } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS) - << ". Expected only non negative numbers (#streams)"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS) - << ". Expected only non negative numbers (#streams)"; - } - _small_core_streams = val_i; - } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG) - << ". Expected only non negative numbers (#threads)"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG) - << ". Expected only non negative numbers (#threads)"; - } - _threads_per_stream_big = val_i; - } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL) - << ". Expected only non negative numbers (#threads)"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL) - << ". Expected only non negative numbers (#threads)"; - } - _threads_per_stream_small = val_i; - } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) { - int val_i; - try { - val_i = std::stoi(value); - } catch (const std::exception&) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET) - << ". Expected only non negative numbers"; - } - if (val_i < 0) { - IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET) - << ". Expected only non negative numbers"; - } - _small_core_offset = val_i; - } else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) { - if (value == CONFIG_VALUE(YES)) { - _enable_hyper_thread = true; - } else if (value == CONFIG_VALUE(NO)) { - _enable_hyper_thread = false; - } else { - OPENVINO_UNREACHABLE("Unsupported enable hyper thread type"); - } - } else { - IE_THROW() << "Wrong value for property key " << key; - } + set_property(key, value); } Parameter IStreamsExecutor::Config::GetConfig(const std::string& key) const { - if (key == ov::affinity) { - switch (_threadBindingType) { - case IStreamsExecutor::ThreadBindingType::NONE: - return ov::Affinity::NONE; - case IStreamsExecutor::ThreadBindingType::CORES: - return ov::Affinity::CORE; - case IStreamsExecutor::ThreadBindingType::NUMA: - return ov::Affinity::NUMA; - case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE: - return ov::Affinity::HYBRID_AWARE; - } - } else if (key == CONFIG_KEY(CPU_BIND_THREAD)) { - switch (_threadBindingType) { - case IStreamsExecutor::ThreadBindingType::NONE: - return {CONFIG_VALUE(NO)}; - case IStreamsExecutor::ThreadBindingType::CORES: - return {CONFIG_VALUE(YES)}; - case IStreamsExecutor::ThreadBindingType::NUMA: - return {CONFIG_VALUE(NUMA)}; - case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE: - return {CONFIG_VALUE(HYBRID_AWARE)}; - } - } else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) { - return {std::to_string(_streams)}; - } else if (key == ov::num_streams) { - return decltype(ov::num_streams)::value_type{_streams}; - } else if (key == CONFIG_KEY(CPU_THREADS_NUM)) { - return {std::to_string(_threads)}; - } else if (key == ov::inference_num_threads) { - return decltype(ov::inference_num_threads)::value_type{_threads}; - } else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) { - return {std::to_string(_threadsPerStream)}; - } else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) { - return {std::to_string(_big_core_streams)}; - } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) { - return {std::to_string(_small_core_streams)}; - } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) { - return {std::to_string(_threads_per_stream_big)}; - } else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) { - return {std::to_string(_threads_per_stream_small)}; - } else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) { - return {std::to_string(_small_core_offset)}; - } else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) { - return {_enable_hyper_thread ? CONFIG_VALUE(YES) : CONFIG_VALUE(NO)}; - } else { - IE_THROW() << "Wrong value for property key " << key; - } - return {}; + return get_property(key); } void IStreamsExecutor::Config::UpdateHybridCustomThreads(Config& config) { - const auto num_cores = parallel_get_max_threads(); - const auto num_cores_phys = getNumberOfCPUCores(); - const auto num_big_cores_phys = getNumberOfCPUCores(true); - const auto num_big_cores = num_cores > num_cores_phys ? num_big_cores_phys * 2 : num_big_cores_phys; - const auto num_small_cores_phys = num_cores_phys - num_big_cores_phys; - const auto threads = config._threads ? config._threads : num_cores; - const auto streams = config._streams > 0 ? config._streams : 1; - - config._small_core_offset = num_big_cores; - int threads_per_stream = std::max(1, threads / streams); - - if ((num_big_cores_phys / threads_per_stream >= streams) && (1 < threads_per_stream)) { - config._big_core_streams = streams; - config._threads_per_stream_big = threads_per_stream; - config._small_core_streams = 0; - config._threads_per_stream_small = 0; - } else if ((num_small_cores_phys / threads_per_stream >= streams) && (num_big_cores_phys < threads_per_stream)) { - config._big_core_streams = 0; - config._threads_per_stream_big = 0; - config._small_core_streams = streams; - config._threads_per_stream_small = threads_per_stream; - } else { - const int threads_per_stream_big = std::min(num_big_cores_phys, threads_per_stream); - const int threads_per_stream_small = std::min(num_small_cores_phys, threads_per_stream); - - threads_per_stream = std::min(threads_per_stream_big, threads_per_stream_small); - while (threads_per_stream > 1) { - const int base_big_streams = num_big_cores_phys / threads_per_stream; - const int base_small_streams = num_small_cores_phys > 0 ? num_small_cores_phys / threads_per_stream : 0; - if (base_big_streams + base_small_streams >= streams) { - config._big_core_streams = base_big_streams; - config._small_core_streams = streams - base_big_streams; - break; - } else if (base_big_streams * 2 + base_small_streams >= streams) { - config._big_core_streams = streams - base_small_streams; - config._small_core_streams = base_small_streams; - break; - } else { - threads_per_stream = threads_per_stream > 1 ? threads_per_stream - 1 : 1; - } - } - - if (threads_per_stream == 1) { - const int stream_loops = streams / num_cores; - const int remain_streams = streams - stream_loops * num_cores; - if (num_big_cores_phys >= remain_streams) { - config._big_core_streams = remain_streams + num_big_cores * stream_loops; - config._small_core_streams = num_small_cores_phys * stream_loops; - } else if (num_big_cores_phys + num_small_cores_phys >= remain_streams) { - config._big_core_streams = num_big_cores_phys + num_big_cores * stream_loops; - config._small_core_streams = remain_streams - num_big_cores_phys + num_small_cores_phys * stream_loops; - } else { - config._big_core_streams = remain_streams - num_small_cores_phys + num_big_cores * stream_loops; - config._small_core_streams = num_small_cores_phys * (stream_loops + 1); - } - } - - config._threads_per_stream_big = threads_per_stream; - config._threads_per_stream_small = threads_per_stream; - } + return update_hybrid_custom_threads(config); } IStreamsExecutor::Config IStreamsExecutor::Config::MakeDefaultMultiThreaded(const IStreamsExecutor::Config& initial, const bool fp_intesive) { - const auto envThreads = parallel_get_env_threads(); - const auto& numaNodes = getAvailableNUMANodes(); - const int numaNodesNum = static_cast(numaNodes.size()); - auto streamExecutorConfig = initial; - const bool bLatencyCase = streamExecutorConfig._streams <= numaNodesNum; - - // by default, do not use the hyper-threading (to minimize threads synch overheads) - int num_cores_default = getNumberOfCPUCores(); -#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) - // additional latency-case logic for hybrid processors: - if (ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType) { - const auto core_types = custom::info::core_types(); - const auto num_little_cores = - custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.front())); - const auto num_big_cores_phys = getNumberOfCPUCores(true); - const int int8_threshold = 4; // ~relative efficiency of the VNNI-intensive code for Big vs Little cores; - const int fp32_threshold = 2; // ~relative efficiency of the AVX2 fp32 code for Big vs Little cores; - // by default the latency case uses (faster) Big cores only, depending on the compute ratio - const bool bLatencyCaseBigOnly = - num_big_cores_phys > (num_little_cores / (fp_intesive ? fp32_threshold : int8_threshold)); - // selecting the preferred core type - streamExecutorConfig._threadPreferredCoreType = - bLatencyCase ? (bLatencyCaseBigOnly ? IStreamsExecutor::Config::PreferredCoreType::BIG - : IStreamsExecutor::Config::PreferredCoreType::ANY) - : IStreamsExecutor::Config::PreferredCoreType::ROUND_ROBIN; - // additionally selecting the #cores to use in the "Big-only" case - if (bLatencyCaseBigOnly) { - const int hyper_threading_threshold = - 2; // min #cores, for which the hyper-threading becomes useful for the latency case - const auto num_big_cores = - custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.back())); - num_cores_default = (num_big_cores_phys <= hyper_threading_threshold) ? num_big_cores : num_big_cores_phys; - } - // if nstreams or nthreads are set, need to calculate the Hybrid aware parameters here - if (!bLatencyCase && (streamExecutorConfig._big_core_streams == 0 || streamExecutorConfig._threads)) { - UpdateHybridCustomThreads(streamExecutorConfig); - } - OPENVINO_DEBUG << "[ p_e_core_info ] streams (threads): " << streamExecutorConfig._streams << "(" - << streamExecutorConfig._threads_per_stream_big * streamExecutorConfig._big_core_streams + - streamExecutorConfig._threads_per_stream_small * streamExecutorConfig._small_core_streams - << ") -- PCore: " << streamExecutorConfig._big_core_streams << "(" - << streamExecutorConfig._threads_per_stream_big - << ") ECore: " << streamExecutorConfig._small_core_streams << "(" - << streamExecutorConfig._threads_per_stream_small << ")"; - } -#endif - const auto hwCores = - !bLatencyCase && numaNodesNum == 1 - // throughput case on a single-NUMA node machine uses all available cores - ? (streamExecutorConfig._enable_hyper_thread ? parallel_get_max_threads() : num_cores_default) - // in the rest of cases: - // multi-node machine - // or - // latency case, single-node yet hybrid case that uses - // all core types - // or - // big-cores only, but the #cores is "enough" (pls see the logic above) - // it is usually beneficial not to use the hyper-threading (which is default) - : num_cores_default; - const auto threads = - streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores); - streamExecutorConfig._threadsPerStream = - streamExecutorConfig._streams ? std::max(1, threads / streamExecutorConfig._streams) : threads; - streamExecutorConfig._threads = - (!bLatencyCase && ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType) - ? streamExecutorConfig._big_core_streams * streamExecutorConfig._threads_per_stream_big + - streamExecutorConfig._small_core_streams * streamExecutorConfig._threads_per_stream_small - : streamExecutorConfig._threadsPerStream * streamExecutorConfig._streams; - return streamExecutorConfig; + return make_default_multi_threaded(initial); } } // namespace InferenceEngine diff --git a/src/inference/src/threading/ie_itask_executor.cpp b/src/inference/src/threading/ie_itask_executor.cpp index f75279dfa44..8e6bf89f389 100644 --- a/src/inference/src/threading/ie_itask_executor.cpp +++ b/src/inference/src/threading/ie_itask_executor.cpp @@ -12,27 +12,7 @@ namespace InferenceEngine { void ITaskExecutor::runAndWait(const std::vector& tasks) { - std::vector> packagedTasks; - std::vector> futures; - for (std::size_t i = 0; i < tasks.size(); ++i) { - packagedTasks.emplace_back([&tasks, i] { - tasks[i](); - }); - futures.emplace_back(packagedTasks.back().get_future()); - } - for (std::size_t i = 0; i < tasks.size(); ++i) { - run([&packagedTasks, i] { - packagedTasks[i](); - }); - } - // std::future::get will rethrow exception from task. - // We should wait all tasks before any exception is thrown. - // So wait() and get() for each future moved to separate loops - for (auto&& future : futures) { - future.wait(); - } - for (auto&& future : futures) { - future.get(); - } + run_and_wait(tasks); } + } // namespace InferenceEngine diff --git a/src/inference/tests/unit/cpu_map_parser.cpp b/src/inference/tests/unit/cpu_map_parser.cpp index d2693c87ff9..20f8ace1862 100644 --- a/src/inference/tests/unit/cpu_map_parser.cpp +++ b/src/inference/tests/unit/cpu_map_parser.cpp @@ -10,7 +10,7 @@ #include "streams_executor.hpp" using namespace testing; -using namespace InferenceEngine; +using namespace ov; namespace { @@ -36,12 +36,12 @@ public: std::vector> test_proc_type_table; std::vector> test_cpu_mapping_table; - InferenceEngine::parse_processor_info_linux(test_data._processors, - test_data.system_info_table, - test_sockets, - test_cores, - test_proc_type_table, - test_cpu_mapping_table); + ov::parse_processor_info_linux(test_data._processors, + test_data.system_info_table, + test_sockets, + test_cores, + test_proc_type_table, + test_cpu_mapping_table); ASSERT_EQ(test_data._sockets, test_sockets); ASSERT_EQ(test_data._cores, test_cores); @@ -629,13 +629,13 @@ public: std::vector> test_proc_type_table; std::vector> test_cpu_mapping_table; - parse_processor_info_win(test_info_ptr, - len, - test_data._processors, - test_sockets, - test_cores, - test_proc_type_table, - test_cpu_mapping_table); + ov::parse_processor_info_win(test_info_ptr, + len, + test_data._processors, + test_sockets, + test_cores, + test_proc_type_table, + test_cpu_mapping_table); ASSERT_EQ(test_data._sockets, test_sockets); ASSERT_EQ(test_data._cores, test_cores); diff --git a/src/inference/tests/unit/ie_executor_manager_tests.cpp b/src/inference/tests/unit/ie_executor_manager_tests.cpp index 42035ac2a5f..a419777c4c1 100644 --- a/src/inference/tests/unit/ie_executor_manager_tests.cpp +++ b/src/inference/tests/unit/ie_executor_manager_tests.cpp @@ -4,36 +4,34 @@ #include -#include +#include "openvino/runtime/threading/executor_manager.hpp" using namespace ::testing; -using namespace std; -using namespace InferenceEngine; TEST(ExecutorManagerTests, canCreateSingleExecutorManager) { - auto executorManager1 = executorManager(); + auto executorManager1 = ov::threading::executor_manager(); - auto executorManager2 = executorManager(); + auto executorManager2 = ov::threading::executor_manager(); ASSERT_EQ(executorManager1, executorManager2); } TEST(ExecutorManagerTests, createDifferentExecutorsForDifferentDevices) { - auto executorMgr = executorManager(); - auto executor1 = executorMgr->getExecutor("CPU"); - auto executor2 = executorMgr->getExecutor("GPU"); + auto executorMgr = ov::threading::executor_manager(); + auto executor1 = executorMgr->get_executor("CPU"); + auto executor2 = executorMgr->get_executor("GPU"); ASSERT_NE(executor1, executor2); - ASSERT_EQ(2, executorMgr->getExecutorsNumber()); + ASSERT_EQ(2, executorMgr->get_executors_number()); } TEST(ExecutorManagerTests, returnTheSameExecutorForTheSameDevice) { - auto executorMgr = executorManager(); - auto executor1 = executorMgr->getExecutor("CPU"); - auto executor2 = executorMgr->getExecutor("GPU"); + auto executorMgr = ov::threading::executor_manager(); + auto executor1 = executorMgr->get_executor("CPU"); + auto executor2 = executorMgr->get_executor("GPU"); - auto executor = executorMgr->getExecutor("GPU"); + auto executor = executorMgr->get_executor("GPU"); ASSERT_EQ(executor, executor2); - ASSERT_EQ(2, executorMgr->getExecutorsNumber()); + ASSERT_EQ(2, executorMgr->get_executors_number()); } diff --git a/src/plugins/template/src/async_infer_request.cpp b/src/plugins/template/src/async_infer_request.cpp index 74d3cfae77a..f92f259ad7b 100644 --- a/src/plugins/template/src/async_infer_request.cpp +++ b/src/plugins/template/src/async_infer_request.cpp @@ -9,10 +9,11 @@ #include "template_itt.hpp" // ! [async_infer_request:ctor] -TemplatePlugin::AsyncInferRequest::AsyncInferRequest(const std::shared_ptr& request, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, - const InferenceEngine::ITaskExecutor::Ptr& wait_executor, - const InferenceEngine::ITaskExecutor::Ptr& callback_executor) +TemplatePlugin::AsyncInferRequest::AsyncInferRequest( + const std::shared_ptr& request, + const std::shared_ptr& task_executor, + const std::shared_ptr& wait_executor, + const std::shared_ptr& callback_executor) : ov::IAsyncInferRequest(request, task_executor, callback_executor), m_wait_executor(wait_executor) { // In current implementation we have CPU only tasks and no needs in 2 executors diff --git a/src/plugins/template/src/async_infer_request.hpp b/src/plugins/template/src/async_infer_request.hpp index 6a049a373b4..a15d7ee388a 100644 --- a/src/plugins/template/src/async_infer_request.hpp +++ b/src/plugins/template/src/async_infer_request.hpp @@ -16,14 +16,14 @@ namespace TemplatePlugin { class AsyncInferRequest : public ov::IAsyncInferRequest { public: AsyncInferRequest(const std::shared_ptr& request, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, - const InferenceEngine::ITaskExecutor::Ptr& wait_executor, - const InferenceEngine::ITaskExecutor::Ptr& callback_executor); + const std::shared_ptr& task_executor, + const std::shared_ptr& wait_executor, + const std::shared_ptr& callback_executor); ~AsyncInferRequest(); private: - InferenceEngine::ITaskExecutor::Ptr m_wait_executor; + std::shared_ptr m_wait_executor; }; // ! [async_infer_request:header] diff --git a/src/plugins/template/src/compiled_model.cpp b/src/plugins/template/src/compiled_model.cpp index 6ea87593020..d08a8352af4 100644 --- a/src/plugins/template/src/compiled_model.cpp +++ b/src/plugins/template/src/compiled_model.cpp @@ -79,7 +79,7 @@ void fill_output_info(const ov::Output& output, InferenceEngine::DataP // ! [executable_network:ctor_cnnnetwork] TemplatePlugin::CompiledModel::CompiledModel(const std::shared_ptr& model, const std::shared_ptr& plugin, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, + const std::shared_ptr& task_executor, const Configuration& cfg) : ov::ICompiledModel(model, plugin, task_executor), // Disable default threads creation _cfg(cfg), diff --git a/src/plugins/template/src/compiled_model.hpp b/src/plugins/template/src/compiled_model.hpp index 82e4455cf18..a138f3f6b68 100644 --- a/src/plugins/template/src/compiled_model.hpp +++ b/src/plugins/template/src/compiled_model.hpp @@ -24,7 +24,7 @@ class CompiledModel : public ov::ICompiledModel { public: CompiledModel(const std::shared_ptr& model, const std::shared_ptr& plugin, - const InferenceEngine::ITaskExecutor::Ptr& task_executor, + const std::shared_ptr& task_executor, const Configuration& cfg); // Methods from a base class ov::ICompiledModel diff --git a/src/plugins/template/src/plugin.cpp b/src/plugins/template/src/plugin.cpp index eeda242842b..5c3aa091d4a 100644 --- a/src/plugins/template/src/plugin.cpp +++ b/src/plugins/template/src/plugin.cpp @@ -91,7 +91,7 @@ std::shared_ptr TemplatePlugin::Plugin::compile_model(const auto fullConfig = Configuration{properties, _cfg}; auto streamsExecutorConfig = - InferenceEngine::IStreamsExecutor::Config::MakeDefaultMultiThreaded(fullConfig._streamsExecutorConfig); + ov::threading::IStreamsExecutor::Config::make_default_multi_threaded(fullConfig._streamsExecutorConfig); streamsExecutorConfig._name = stream_executor_name; auto compiled_model = std::make_shared(model->clone(), diff --git a/src/plugins/template/src/plugin.hpp b/src/plugins/template/src/plugin.hpp index 1c45317522b..aa5b9077312 100644 --- a/src/plugins/template/src/plugin.hpp +++ b/src/plugins/template/src/plugin.hpp @@ -8,6 +8,7 @@ #include "compiled_model.hpp" #include "openvino/runtime/icompiled_model.hpp" #include "openvino/runtime/iplugin.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" #include "template_config.hpp" //! [plugin:header] @@ -50,7 +51,7 @@ private: std::shared_ptr _backend; Configuration _cfg; - InferenceEngine::ITaskExecutor::Ptr _waitExecutor; + std::shared_ptr _waitExecutor; }; } // namespace TemplatePlugin diff --git a/src/plugins/template/src/template_config.cpp b/src/plugins/template/src/template_config.cpp index 582b1c6d058..16ae7092842 100644 --- a/src/plugins/template/src/template_config.cpp +++ b/src/plugins/template/src/template_config.cpp @@ -16,16 +16,17 @@ Configuration::Configuration() {} Configuration::Configuration(const ConfigMap& config, const Configuration& defaultCfg, bool throwOnUnsupported) { *this = defaultCfg; // If plugin needs to use InferenceEngine::StreamsExecutor it should be able to process its configuration - auto streamExecutorConfigKeys = _streamsExecutorConfig.SupportedKeys(); + auto streamExecutorConfigKeys = + _streamsExecutorConfig.get_property(ov::supported_properties.name()).as>(); for (auto&& c : config) { const auto& key = c.first; const auto& value = c.second; if (ov::template_plugin::throughput_streams == key) { - _streamsExecutorConfig.SetConfig(CONFIG_KEY(CPU_THROUGHPUT_STREAMS), value.as()); + _streamsExecutorConfig.set_property(CONFIG_KEY(CPU_THROUGHPUT_STREAMS), value); } else if (streamExecutorConfigKeys.end() != std::find(std::begin(streamExecutorConfigKeys), std::end(streamExecutorConfigKeys), key)) { - _streamsExecutorConfig.SetConfig(key, value.as()); + _streamsExecutorConfig.set_property(key, value); } else if (CONFIG_KEY(DEVICE_ID) == key) { deviceId = std::stoi(value.as()); if (deviceId > 0) { @@ -42,11 +43,12 @@ Configuration::Configuration(const ConfigMap& config, const Configuration& defau } } -InferenceEngine::Parameter Configuration::Get(const std::string& name) const { - auto streamExecutorConfigKeys = _streamsExecutorConfig.SupportedKeys(); +ov::Any Configuration::Get(const std::string& name) const { + auto streamExecutorConfigKeys = + _streamsExecutorConfig.get_property(ov::supported_properties.name()).as>(); if ((streamExecutorConfigKeys.end() != std::find(std::begin(streamExecutorConfigKeys), std::end(streamExecutorConfigKeys), name))) { - return _streamsExecutorConfig.GetConfig(name); + return _streamsExecutorConfig.get_property(name); } else if (name == CONFIG_KEY(DEVICE_ID)) { return {std::to_string(deviceId)}; } else if (name == CONFIG_KEY(PERF_COUNT)) { @@ -54,7 +56,7 @@ InferenceEngine::Parameter Configuration::Get(const std::string& name) const { } else if (name == ov::template_plugin::throughput_streams || name == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) { return {std::to_string(_streamsExecutorConfig._streams)}; } else if (name == CONFIG_KEY(CPU_BIND_THREAD)) { - return const_cast(_streamsExecutorConfig).GetConfig(name); + return _streamsExecutorConfig.get_property(name); } else if (name == CONFIG_KEY(CPU_THREADS_NUM)) { return {std::to_string(_streamsExecutorConfig._threads)}; } else if (name == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) { diff --git a/src/plugins/template/src/template_config.hpp b/src/plugins/template/src/template_config.hpp index 74b578546e7..20363312397 100644 --- a/src/plugins/template/src/template_config.hpp +++ b/src/plugins/template/src/template_config.hpp @@ -4,11 +4,11 @@ #pragma once -#include #include -#include #include -#include + +#include "openvino/runtime/properties.hpp" +#include "openvino/runtime/threading/istreams_executor.hpp" namespace TemplatePlugin { @@ -26,13 +26,13 @@ struct Configuration { const Configuration& defaultCfg = {}, const bool throwOnUnsupported = true); - InferenceEngine::Parameter Get(const std::string& name) const; + ov::Any Get(const std::string& name) const; // Plugin configuration parameters int deviceId = 0; bool perfCount = true; - InferenceEngine::IStreamsExecutor::Config _streamsExecutorConfig; + ov::threading::IStreamsExecutor::Config _streamsExecutorConfig; ov::hint::PerformanceMode performance_mode = ov::hint::PerformanceMode::UNDEFINED; }; // ! [configuration:header]