Moved Task, Streams, CPUStreams Executors to new API (#15913)

* Moved Task, Streams, CPUStreams Executors to new API

* Fixed some build issues

* Fixed new build issues

* Try to fix tests

* Fixed inference unit tests

* Small build fix

* Added more system headers

* Try to fix naming style

* Fixed namespace

* Fixed android build
This commit is contained in:
Ilya Churaev
2023-02-24 15:20:32 +04:00
committed by GitHub
parent 15935069ff
commit a730ef18eb
42 changed files with 1719 additions and 728 deletions

View File

@@ -12,7 +12,7 @@
#include <exception>
#include <vector>
#include "ie_api.h"
#include "openvino/runtime/system_conf.hpp"
namespace InferenceEngine {
@@ -23,7 +23,9 @@ namespace InferenceEngine {
* @param[in] includeOMPNumThreads Indicates if the omp number threads is included
* @return `True` if any OpenMP environment variable is defined, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) checkOpenMpEnvVars(bool includeOMPNumThreads = true);
inline bool checkOpenMpEnvVars(bool includeOMPNumThreads = true) {
return ov::check_open_mp_env_vars(includeOMPNumThreads);
}
/**
* @brief Returns available CPU NUMA nodes (on Linux, and Windows [only with TBB], single node is assumed on all
@@ -31,7 +33,9 @@ INFERENCE_ENGINE_API_CPP(bool) checkOpenMpEnvVars(bool includeOMPNumThreads = tr
* @ingroup ie_dev_api_system_conf
* @return NUMA nodes
*/
INFERENCE_ENGINE_API_CPP(std::vector<int>) getAvailableNUMANodes();
inline std::vector<int> getAvailableNUMANodes() {
return ov::get_available_numa_nodes();
}
/**
* @brief Returns available CPU cores types (on Linux, and Windows) and ONLY with TBB, single core type is assumed
@@ -39,7 +43,9 @@ INFERENCE_ENGINE_API_CPP(std::vector<int>) getAvailableNUMANodes();
* @ingroup ie_dev_api_system_conf
* @return Vector of core types
*/
INFERENCE_ENGINE_API_CPP(std::vector<int>) getAvailableCoresTypes();
inline std::vector<int> getAvailableCoresTypes() {
return ov::get_available_cores_types();
}
/**
* @brief Returns number of CPU physical cores on Linux/Windows (which is considered to be more performance
@@ -50,7 +56,9 @@ INFERENCE_ENGINE_API_CPP(std::vector<int>) getAvailableCoresTypes();
* @param[in] bigCoresOnly Additionally limits the number of reported cores to the 'Big' cores only.
* @return Number of physical CPU cores.
*/
INFERENCE_ENGINE_API_CPP(int) getNumberOfCPUCores(bool bigCoresOnly = false);
inline int getNumberOfCPUCores(bool bigCoresOnly = false) {
return ov::get_number_of_cpu_cores(bigCoresOnly);
}
/**
* @brief Returns number of CPU logical cores on Linux/Windows (on other OSes it simply relies on the original
@@ -60,80 +68,81 @@ INFERENCE_ENGINE_API_CPP(int) getNumberOfCPUCores(bool bigCoresOnly = false);
* @param[in] bigCoresOnly Additionally limits the number of reported cores to the 'Big' cores only.
* @return Number of logical CPU cores.
*/
INFERENCE_ENGINE_API_CPP(int) getNumberOfLogicalCPUCores(bool bigCoresOnly = false);
inline int getNumberOfLogicalCPUCores(bool bigCoresOnly = false) {
return ov::get_number_of_logical_cpu_cores(bigCoresOnly);
}
/**
* @brief Checks whether CPU supports SSE 4.2 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is SSE 4.2 instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_sse42();
using ov::with_cpu_x86_sse42;
/**
* @brief Checks whether CPU supports AVX capability
* @ingroup ie_dev_api_system_conf
* @return `True` is AVX instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx();
using ov::with_cpu_x86_avx;
/**
* @brief Checks whether CPU supports AVX2 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is AVX2 instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx2();
using ov::with_cpu_x86_avx2;
/**
* @brief Checks whether CPU supports AVX 512 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is AVX512F (foundation) instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512f();
using ov::with_cpu_x86_avx512f;
/**
* @brief Checks whether CPU supports AVX 512 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is AVX512F, AVX512BW, AVX512DQ instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core();
using ov::with_cpu_x86_avx512_core;
/**
* @brief Checks whether CPU supports AVX 512 VNNI capability
* @ingroup ie_dev_api_system_conf
* @return `True` is AVX512F, AVX512BW, AVX512DQ, AVX512_VNNI instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_vnni();
using ov::with_cpu_x86_avx512_core_vnni;
/**
* @brief Checks whether CPU supports BFloat16 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is tAVX512_BF16 instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_bfloat16();
using ov::with_cpu_x86_bfloat16;
/**
* @brief Checks whether CPU supports AMX int8 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is tAMX_INT8 instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx_int8();
using ov::with_cpu_x86_avx512_core_amx_int8;
/**
* @brief Checks whether CPU supports AMX bf16 capability
* @ingroup ie_dev_api_system_conf
* @return `True` is tAMX_BF16 instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx_bf16();
using ov::with_cpu_x86_avx512_core_amx_bf16;
/**
* @brief Checks whether CPU supports AMX capability
* @ingroup ie_dev_api_system_conf
* @return `True` is tAMX_INT8 or tAMX_BF16 instructions are available, `false` otherwise
*/
INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx();
using ov::with_cpu_x86_avx512_core_amx;
/**
* @enum column_of_processor_type_table
* @brief This enum contains defination of each columns in processor type table which bases on cpu core types. Will
* extend to support other CPU core type like ARM.
*
@@ -150,16 +159,9 @@ INFERENCE_ENGINE_API_CPP(bool) with_cpu_x86_avx512_core_amx();
* ALL_PROC | MAIN_CORE_PROC | EFFICIENT_CORE_PROC | HYPER_THREADING_PROC
* 32 8 16 8 // Total number of one socket
*/
typedef enum {
ALL_PROC = 0, //!< All processors, regardless of backend cpu
MAIN_CORE_PROC = 1, //!< Processor based on physical core of Intel Performance-cores
EFFICIENT_CORE_PROC = 2, //!< Processor based on Intel Efficient-cores
HYPER_THREADING_PROC = 3, //!< Processor based on logical core of Intel Performance-cores
PROC_TYPE_TABLE_SIZE = 4 //!< Size of processor type table
} column_of_processor_type_table;
using ov::ColumnOfProcessorTypeTable;
/**
* @enum column_of_cpu_mapping_table
* @brief This enum contains defination of each columns in CPU mapping table which use processor id as index.
*
* GROUP_ID is generated according to the following rules.
@@ -181,14 +183,6 @@ typedef enum {
* 6 0 4 2 2 0
* 7 0 5 2 2 0
*/
typedef enum {
CPU_MAP_PROCESSOR_ID = 0, //!< column for processor id of the processor
CPU_MAP_SOCKET_ID = 1, //!< column for socket id of the processor
CPU_MAP_CORE_ID = 2, //!< column for hardware core id of the processor
CPU_MAP_CORE_TYPE = 3, //!< column for CPU core type corresponding to the processor
CPU_MAP_GROUP_ID = 4, //!< column for group id to the processor. Processors in one group have dependency.
CPU_MAP_USED_FLAG = 5, //!< column for resource management of the processor
CPU_MAP_TABLE_SIZE = 6 //!< Size of CPU mapping table
} column_of_cpu_mapping_table;
using ov::ColumnOfCPUMappingTable;
} // namespace InferenceEngine

View File

@@ -17,7 +17,7 @@
#include "openvino/runtime/iinfer_request.hpp"
#include "openvino/runtime/profiling_info.hpp"
#include "openvino/runtime/tensor.hpp"
#include "threading/ie_itask_executor.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
namespace ov {
@@ -37,8 +37,8 @@ namespace ov {
class OPENVINO_RUNTIME_API IAsyncInferRequest : public IInferRequest {
public:
IAsyncInferRequest(const std::shared_ptr<IInferRequest>& request,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const InferenceEngine::ITaskExecutor::Ptr& callback_executor);
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& callback_executor);
~IAsyncInferRequest();
/**
@@ -153,7 +153,7 @@ public:
const std::vector<ov::Output<const ov::Node>>& get_outputs() const override;
protected:
using Stage = std::pair<InferenceEngine::ITaskExecutor::Ptr, InferenceEngine::Task>;
using Stage = std::pair<std::shared_ptr<ov::threading::ITaskExecutor>, ov::threading::Task>;
/**
* @brief Pipeline is vector of stages
*/
@@ -212,11 +212,11 @@ private:
void run_first_stage(const Pipeline::iterator itBeginStage,
const Pipeline::iterator itEndStage,
const InferenceEngine::ITaskExecutor::Ptr callbackExecutor = {});
const std::shared_ptr<ov::threading::ITaskExecutor> callbackExecutor = {});
InferenceEngine::Task make_next_stage_task(const Pipeline::iterator itStage,
const Pipeline::iterator itEndStage,
const InferenceEngine::ITaskExecutor::Ptr callbackExecutor);
ov::threading::Task make_next_stage_task(const Pipeline::iterator itStage,
const Pipeline::iterator itEndStage,
const std::shared_ptr<ov::threading::ITaskExecutor> callbackExecutor);
template <typename F>
void infer_impl(const F& f) {
@@ -264,10 +264,10 @@ private:
std::shared_ptr<IInferRequest> m_sync_request;
InferenceEngine::ITaskExecutor::Ptr m_request_executor; //!< Used to run inference CPU tasks.
InferenceEngine::ITaskExecutor::Ptr
std::shared_ptr<ov::threading::ITaskExecutor> m_request_executor; //!< Used to run inference CPU tasks.
std::shared_ptr<ov::threading::ITaskExecutor>
m_callback_executor; //!< Used to run post inference callback in asynchronous pipline
InferenceEngine::ITaskExecutor::Ptr
std::shared_ptr<ov::threading::ITaskExecutor>
m_sync_callback_executor; //!< Used to run post inference callback in synchronous pipline
mutable std::mutex m_mutex;
std::function<void(std::exception_ptr)> m_callback;

View File

@@ -17,8 +17,8 @@
#include "openvino/runtime/common.hpp"
#include "openvino/runtime/isync_infer_request.hpp"
#include "openvino/runtime/remote_context.hpp"
#include "threading/ie_cpu_streams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
#include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
namespace InferenceEngine {
class ICompiledModelWrapper;
@@ -47,14 +47,13 @@ public:
*
* @param callback_executor Callback executor (CPUStreamsExecutor by default)
*/
ICompiledModel(const std::shared_ptr<const ov::Model>& model,
const std::shared_ptr<const ov::IPlugin>& plugin,
const InferenceEngine::ITaskExecutor::Ptr& task_executor =
std::make_shared<InferenceEngine::CPUStreamsExecutor>(InferenceEngine::IStreamsExecutor::Config{
"Default"}),
const InferenceEngine::ITaskExecutor::Ptr& callback_executor =
std::make_shared<InferenceEngine::CPUStreamsExecutor>(InferenceEngine::IStreamsExecutor::Config{
"Callback"}));
ICompiledModel(
const std::shared_ptr<const ov::Model>& model,
const std::shared_ptr<const ov::IPlugin>& plugin,
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor =
std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{"Default"}),
const std::shared_ptr<ov::threading::ITaskExecutor>& callback_executor =
std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{"Callback"}));
/**
* @brief Gets all outputs from compiled model
@@ -119,8 +118,8 @@ private:
std::vector<ov::Output<const ov::Node>> m_inputs;
std::vector<ov::Output<const ov::Node>> m_outputs;
InferenceEngine::ITaskExecutor::Ptr m_task_executor = nullptr; //!< Holds a task executor
InferenceEngine::ITaskExecutor::Ptr m_callback_executor = nullptr; //!< Holds a callback executor
std::shared_ptr<ov::threading::ITaskExecutor> m_task_executor = nullptr; //!< Holds a task executor
std::shared_ptr<ov::threading::ITaskExecutor> m_callback_executor = nullptr; //!< Holds a callback executor
friend ov::CoreImpl;
friend ov::IExecutableNetworkWrapper;
@@ -146,7 +145,7 @@ protected:
/**
* @brief Default implementation of create async inter request method
*
* @tparam AsyncInferRequestType Async infer request type. InferenceEngine::AsyncInferRequestThreadSafeDefault by
* @tparam AsyncInferRequestType Async infer request type. ov::IAsyncInferRequest by
* default
*
* @return Asynchronous infer request
@@ -163,8 +162,8 @@ protected:
* @return OpenVINO Plugin interface
*/
const std::shared_ptr<const ov::IPlugin>& get_plugin() const;
const InferenceEngine::ITaskExecutor::Ptr get_task_executor() const;
const InferenceEngine::ITaskExecutor::Ptr get_callback_executor() const;
const std::shared_ptr<ov::threading::ITaskExecutor> get_task_executor() const;
const std::shared_ptr<ov::threading::ITaskExecutor> get_callback_executor() const;
};
} // namespace ov

View File

@@ -188,7 +188,7 @@ public:
* @brief Gets reference to tasks execution manager
* @return Reference to ExecutorManager interface
*/
const std::shared_ptr<ov::ExecutorManager>& get_executor_manager() const;
const std::shared_ptr<ov::threading::ExecutorManager>& get_executor_manager() const;
~IPlugin() = default;
@@ -198,11 +198,11 @@ protected:
private:
friend ::InferenceEngine::IPluginWrapper;
std::string m_plugin_name; //!< A device name that plugins enables
std::weak_ptr<ov::ICore> m_core; //!< A pointer to ICore interface
std::shared_ptr<ov::ExecutorManager> m_executor_manager; //!< A tasks execution manager
ov::Version m_version; //!< Member contains plugin version
bool m_is_new_api; //!< A flag which shows used API
std::string m_plugin_name; //!< A device name that plugins enables
std::weak_ptr<ov::ICore> m_core; //!< A pointer to ICore interface
std::shared_ptr<ov::threading::ExecutorManager> m_executor_manager; //!< A tasks execution manager
ov::Version m_version; //!< Member contains plugin version
bool m_is_new_api; //!< A flag which shows used API
};
} // namespace ov

View File

@@ -0,0 +1,193 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @brief Abstraction over platform specific implementations
* @file openvino/runtime/system_conf.hpp
*/
#pragma once
#include <vector>
#include "openvino/runtime/common.hpp"
namespace ov {
/**
* @brief Checks whether OpenMP environment variables are defined
* @ingroup ov_dev_api_system_conf
*
* @param[in] include_omp_num_threads Indicates if the omp number threads is included
* @return `True` if any OpenMP environment variable is defined, `false` otherwise
*/
OPENVINO_RUNTIME_API bool check_open_mp_env_vars(bool include_omp_num_threads = true);
/**
* @brief Returns available CPU NUMA nodes (on Linux, and Windows [only with TBB], single node is assumed on all
* other OSes)
* @ingroup ov_dev_api_system_conf
* @return NUMA nodes
*/
OPENVINO_RUNTIME_API std::vector<int> get_available_numa_nodes();
/**
* @brief Returns available CPU cores types (on Linux, and Windows) and ONLY with TBB, single core type is assumed
* otherwise
* @ingroup ov_dev_api_system_conf
* @return Vector of core types
*/
OPENVINO_RUNTIME_API std::vector<int> get_available_cores_types();
/**
* @brief Returns number of CPU physical cores on Linux/Windows (which is considered to be more performance
* friendly for servers) (on other OSes it simply relies on the original parallel API of choice, which usually uses the
* logical cores). call function with 'false' to get #phys cores of all types call function with 'true' to get #phys
* 'Big' cores number of 'Little' = 'all' - 'Big'
* @ingroup ov_dev_api_system_conf
* @param[in] big_cores_only Additionally limits the number of reported cores to the 'Big' cores only.
* @return Number of physical CPU cores.
*/
OPENVINO_RUNTIME_API int get_number_of_cpu_cores(bool big_cores_only = false);
/**
* @brief Returns number of CPU logical cores on Linux/Windows (on other OSes it simply relies on the original
* parallel API of choice, which uses the 'all' logical cores). call function with 'false' to get #logical cores of
* all types call function with 'true' to get #logical 'Big' cores number of 'Little' = 'all' - 'Big'
* @ingroup ov_dev_api_system_conf
* @param[in] big_cores_only Additionally limits the number of reported cores to the 'Big' cores only.
* @return Number of logical CPU cores.
*/
OPENVINO_RUNTIME_API int get_number_of_logical_cpu_cores(bool big_cores_only = false);
/**
* @brief Checks whether CPU supports SSE 4.2 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is SSE 4.2 instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_sse42();
/**
* @brief Checks whether CPU supports AVX capability
* @ingroup ov_dev_api_system_conf
* @return `True` is AVX instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx();
/**
* @brief Checks whether CPU supports AVX2 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is AVX2 instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx2();
/**
* @brief Checks whether CPU supports AVX 512 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is AVX512F (foundation) instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx512f();
/**
* @brief Checks whether CPU supports AVX 512 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is AVX512F, AVX512BW, AVX512DQ instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core();
/**
* @brief Checks whether CPU supports AVX 512 VNNI capability
* @ingroup ov_dev_api_system_conf
* @return `True` is AVX512F, AVX512BW, AVX512DQ, AVX512_VNNI instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_vnni();
/**
* @brief Checks whether CPU supports BFloat16 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is tAVX512_BF16 instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_bfloat16();
/**
* @brief Checks whether CPU supports AMX int8 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is tAMX_INT8 instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_amx_int8();
/**
* @brief Checks whether CPU supports AMX bf16 capability
* @ingroup ov_dev_api_system_conf
* @return `True` is tAMX_BF16 instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_amx_bf16();
/**
* @brief Checks whether CPU supports AMX capability
* @ingroup ov_dev_api_system_conf
* @return `True` is tAMX_INT8 or tAMX_BF16 instructions are available, `false` otherwise
*/
OPENVINO_RUNTIME_API bool with_cpu_x86_avx512_core_amx();
/**
* @enum ColumnOfProcessorTypeTable
* @brief This enum contains defination of each columns in processor type table which bases on cpu core types. Will
* extend to support other CPU core type like ARM.
*
* The following are two example of processor type table.
* 1. Processor table of two socket CPUs XEON server
*
* ALL_PROC | MAIN_CORE_PROC | EFFICIENT_CORE_PROC | HYPER_THREADING_PROC
* 96 48 0 48 // Total number of two sockets
* 48 24 0 24 // Number of socket one
* 48 24 0 24 // Number of socket two
*
* 2. Processor table of one socket CPU desktop
*
* ALL_PROC | MAIN_CORE_PROC | EFFICIENT_CORE_PROC | HYPER_THREADING_PROC
* 32 8 16 8 // Total number of one socket
*/
enum ColumnOfProcessorTypeTable {
ALL_PROC = 0, //!< All processors, regardless of backend cpu
MAIN_CORE_PROC = 1, //!< Processor based on physical core of Intel Performance-cores
EFFICIENT_CORE_PROC = 2, //!< Processor based on Intel Efficient-cores
HYPER_THREADING_PROC = 3, //!< Processor based on logical core of Intel Performance-cores
PROC_TYPE_TABLE_SIZE = 4 //!< Size of processor type table
};
/**
* @enum ColumnOfCPUMappingTable
* @brief This enum contains defination of each columns in CPU mapping table which use processor id as index.
*
* GROUP_ID is generated according to the following rules.
* 1. If one MAIN_CORE_PROC and one HYPER_THREADING_PROC are based on same Performance-cores, they are in one group.
* 2. If some EFFICIENT_CORE_PROC share one L2 cachle, they are in one group.
* 3. There are no duplicate group IDs in the system
*
* The following is the example of CPU mapping table.
* 1. Four processors of two Pcore
* 2. Four processors of four Ecores shared L2 cache
*
* PROCESSOR_ID | SOCKET_ID | CORE_ID | CORE_TYPE | GROUP_ID | Used
* 0 0 0 3 0 0
* 1 0 0 1 0 0
* 2 0 1 3 1 0
* 3 0 1 1 1 0
* 4 0 2 2 2 0
* 5 0 3 2 2 0
* 6 0 4 2 2 0
* 7 0 5 2 2 0
*/
enum ColumnOfCPUMappingTable {
CPU_MAP_PROCESSOR_ID = 0, //!< column for processor id of the processor
CPU_MAP_SOCKET_ID = 1, //!< column for socket id of the processor
CPU_MAP_CORE_ID = 2, //!< column for hardware core id of the processor
CPU_MAP_CORE_TYPE = 3, //!< column for CPU core type corresponding to the processor
CPU_MAP_GROUP_ID = 4, //!< column for group id to the processor. Processors in one group have dependency.
CPU_MAP_USED_FLAG = 5, //!< column for resource management of the processor
CPU_MAP_TABLE_SIZE = 6 //!< Size of CPU mapping table
};
} // namespace ov

View File

@@ -0,0 +1,55 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file openvino/runtime/threading/cpu_streams_executor.hpp
* @brief A header file for OpenVINO CPU-Streams-based Executor implementation.
*/
#pragma once
#include <memory>
#include <openvino/runtime/common.hpp>
#include <string>
#include "openvino/runtime/threading/istreams_executor.hpp"
namespace ov {
namespace threading {
/**
* @class CPUStreamsExecutor
* @ingroup ov_dev_api_threading
* @brief CPU Streams executor implementation. The executor splits the CPU into groups of threads,
* that can be pinned to cores or NUMA nodes.
* It uses custom threads to pull tasks from single queue.
*/
class OPENVINO_RUNTIME_API CPUStreamsExecutor : public IStreamsExecutor {
public:
/**
* @brief Constructor
* @param config Stream executor parameters
*/
explicit CPUStreamsExecutor(const Config& config);
/**
* @brief A class destructor
*/
~CPUStreamsExecutor() override;
void run(Task task) override;
void execute(Task task) override;
int get_stream_id() override;
int get_numa_node_id() override;
private:
struct Impl;
std::unique_ptr<Impl> _impl;
};
} // namespace threading
} // namespace ov

View File

@@ -10,11 +10,14 @@
#pragma once
#include "openvino/runtime/common.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
#include "threading/ie_istreams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
namespace ov {
namespace threading {
/**
* @interface ExecutorManager
* @brief Interface for tasks execution manager.
@@ -31,7 +34,7 @@ public:
* @param id An unique identificator of device (Usually string representation of TargetDevice)
* @return A shared pointer to existing or newly ITaskExecutor
*/
virtual InferenceEngine::ITaskExecutor::Ptr get_executor(const std::string& id) = 0;
virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string& id) = 0;
/**
* @brief Returns idle cpu streams executor
@@ -40,8 +43,8 @@ public:
*
* @return pointer to streams executor config
*/
virtual InferenceEngine::IStreamsExecutor::Ptr get_idle_cpu_streams_executor(
const InferenceEngine::IStreamsExecutor::Config& config) = 0;
virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(
const ov::threading::IStreamsExecutor::Config& config) = 0;
/**
* @brief Allows to configure executor manager
@@ -73,5 +76,5 @@ public:
};
OPENVINO_API std::shared_ptr<ExecutorManager> executor_manager();
} // namespace threading
} // namespace ov

View File

@@ -0,0 +1,168 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file ie_istreams_executor.hpp
* @brief A header file for Inference Engine Streams-based Executor Interface
*/
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "openvino/runtime/common.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
namespace ov {
namespace threading {
/**
* @interface IStreamsExecutor
* @ingroup ov_dev_api_threading
* @brief Interface for Streams Task Executor. This executor groups worker threads into so-called `streams`.
* @par CPU
* The executor executes all parallel tasks using threads from one stream.
* With proper pinning settings it should reduce cache misses for memory bound workloads.
* @par NUMA
* On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream
*/
class OPENVINO_RUNTIME_API IStreamsExecutor : virtual public ITaskExecutor {
public:
/**
* @brief Defines inference thread binding type
*/
enum ThreadBindingType : std::uint8_t {
NONE, //!< Don't bind the inference threads
CORES, //!< Bind inference threads to the CPU cores (round-robin)
// the following modes are implemented only for the TBB code-path:
NUMA, //!< Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the 'CORES' is
//!< not implemeneted)
HYBRID_AWARE //!< Let the runtime bind the inference threads depending on the cores type (default mode for the
//!< hybrid CPUs)
};
/**
* @brief Defines IStreamsExecutor configuration
*/
struct OPENVINO_RUNTIME_API Config {
/**
* @brief Sets configuration
* @param properties map of properties
*/
void set_property(const ov::AnyMap& properties);
/**
* @brief Sets configuration
* @param key property name
* @param value property value
*/
void set_property(const std::string& key, const ov::Any& value);
/**
* @brief Return configuration value
* @param key configuration key
* @return configuration value wrapped into ov::Any
*/
ov::Any get_property(const std::string& key) const;
/**
* @brief Create appropriate multithreaded configuration
* filing unconfigured values from initial configuration using hardware properties
* @param initial Inital configuration
* @param fp_intesive additional hint for the the (Hybrid) core-types selection logic
* whether the executor should be configured for floating point intensive work (as opposite to int8
* intensive)
* @return configured values
*/
static Config make_default_multi_threaded(const Config& initial, const bool fp_intesive = true);
static int get_default_num_streams(
const bool enable_hyper_thread = true); // no network specifics considered (only CPU's caps);
static int get_hybrid_num_streams(std::map<std::string, std::string>& config, const int stream_mode);
static void update_hybrid_custom_threads(Config& config);
std::string _name; //!< Used by `ITT` to name executor threads
int _streams = 1; //!< Number of streams.
int _threadsPerStream = 0; //!< Number of threads per stream that executes `ie_parallel` calls
ThreadBindingType _threadBindingType = ThreadBindingType::NONE; //!< Thread binding to hardware resource type.
//!< No binding by default
int _threadBindingStep = 1; //!< In case of @ref CORES binding offset type
//!< thread binded to cores with defined step
int _threadBindingOffset = 0; //!< In case of @ref CORES binding offset type thread binded to cores
//!< starting from offset
int _threads = 0; //!< Number of threads distributed between streams.
//!< Reserved. Should not be used.
int _big_core_streams = 0; //!< Number of streams in Performance-core(big core)
int _small_core_streams = 0; //!< Number of streams in Efficient-core(small core)
int _threads_per_stream_big = 0; //!< Threads per stream in big cores
int _threads_per_stream_small = 0; //!< Threads per stream in small cores
int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores
bool _enable_hyper_thread = true; //!< enable hyper thread
enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE };
enum PreferredCoreType {
ANY,
LITTLE,
BIG,
ROUND_ROBIN // used w/multiple streams to populate the Big cores first, then the Little, then wrap around
// (for large #streams)
} _threadPreferredCoreType =
PreferredCoreType::ANY; //!< In case of @ref HYBRID_AWARE hints the TBB to affinitize
/**
* @brief A constructor with arguments
*
* @param[in] name The executor name
* @param[in] streams @copybrief Config::_streams
* @param[in] threadsPerStream @copybrief Config::_threadsPerStream
* @param[in] threadBindingType @copybrief Config::_threadBindingType
* @param[in] threadBindingStep @copybrief Config::_threadBindingStep
* @param[in] threadBindingOffset @copybrief Config::_threadBindingOffset
* @param[in] threads @copybrief Config::_threads
* @param[in] threadPreferBigCores @copybrief Config::_threadPreferBigCores
*/
Config(std::string name = "StreamsExecutor",
int streams = 1,
int threadsPerStream = 0,
ThreadBindingType threadBindingType = ThreadBindingType::NONE,
int threadBindingStep = 1,
int threadBindingOffset = 0,
int threads = 0,
PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY)
: _name{name},
_streams{streams},
_threadsPerStream{threadsPerStream},
_threadBindingType{threadBindingType},
_threadBindingStep{threadBindingStep},
_threadBindingOffset{threadBindingOffset},
_threads{threads},
_threadPreferredCoreType(threadPreferredCoreType) {}
};
/**
* @brief A virtual destructor
*/
~IStreamsExecutor() override;
/**
* @brief Return the index of current stream
* @return An index of current stream. Or throw exceptions if called not from stream thread
*/
virtual int get_stream_id() = 0;
/**
* @brief Return the id of current NUMA Node
* @return `ID` of current NUMA Node, or throws exceptions if called not from stream thread
*/
virtual int get_numa_node_id() = 0;
/**
* @brief Execute the task in the current thread using streams executor configuration and constraints
* @param task A task to start
*/
virtual void execute(Task task) = 0;
};
} // namespace threading
} // namespace ov

View File

@@ -0,0 +1,76 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file openvino/runtime/threading/task_executor.hpp
* @brief A header file for OpenVINO Task Executor Interface
*/
#pragma once
#include <functional>
#include <memory>
#include <vector>
#include "openvino/runtime/common.hpp"
namespace ov {
namespace threading {
/**
* @brief OpenVINO Task Executor can use any copyable callable without parameters and output as a task.
* It would be wrapped into std::function object
* @ingroup ov_dev_api_threading
*/
using Task = std::function<void()>;
/**
* @interface ITaskExecutor
* @ingroup ov_dev_api_threading
* @brief Interface for Task Executor.
* OpenVINO uses `ov::ITaskExecutor` interface to run all asynchronous internal tasks.
* Different implementations of task executors can be used for different purposes:
* - To improve cache locality of memory bound CPU tasks some executors can limit task's affinity and maximum
concurrency.
* - The executor with one worker thread can be used to serialize access to acceleration device.
* - Immediate task executor can be used to satisfy `ov::ITaskExecutor` interface restrictions but
run tasks in current thread.
* @note Implementation should guaranty thread safety of all methods
* @section Synchronization
* It is `ov::ITaskExecutor` user responsibility to wait for task execution completion.
* The `c++11` standard way to wait task completion is to use `std::packaged_task` or `std::promise` with
`std::future`.
* Here is an example of how to use `std::promise` to wait task completion and process task's exceptions:
* @snippet example_itask_executor.cpp itask_executor:define_pipeline
*/
class OPENVINO_RUNTIME_API ITaskExecutor {
public:
/**
* @brief Destroys the object.
*/
virtual ~ITaskExecutor() = default;
/**
* @brief Execute ov::Task inside task executor context
* @param task A task to start
*/
virtual void run(Task task) = 0;
/**
* @brief Execute all of the tasks and waits for its completion.
* Default run_and_wait() method implementation uses run() pure virtual method
* and higher level synchronization primitives from STL.
* The task is wrapped into std::packaged_task which returns std::future.
* std::packaged_task will call the task and signal to std::future that the task is finished
* or the exception is thrown from task
* Than std::future is used to wait for task execution completion and
* task exception extraction
* @note run_and_wait() does not copy or capture tasks!
* @param tasks A vector of tasks to execute
*/
virtual void run_and_wait(const std::vector<Task>& tasks);
};
} // namespace threading
} // namespace ov

View File

@@ -33,7 +33,7 @@ public:
* @brief Constructor
* @param config Stream executor parameters
*/
explicit CPUStreamsExecutor(const Config& config = {});
explicit CPUStreamsExecutor(const InferenceEngine::IStreamsExecutor::Config& config = {});
/**
* @brief A class destructor

View File

@@ -19,10 +19,12 @@
#include "threading/ie_itask_executor.hpp"
namespace ov {
namespace threading {
class ExecutorManager;
}
} // namespace ov
namespace InferenceEngine {
@@ -86,13 +88,13 @@ public:
virtual bool getTbbFlag() = 0;
private:
virtual std::shared_ptr<ov::ExecutorManager> get_ov_manager() const = 0;
virtual std::shared_ptr<ov::threading::ExecutorManager> get_ov_manager() const = 0;
friend class IPluginWrapper;
};
INFERENCE_ENGINE_API_CPP(ExecutorManager::Ptr) executorManager();
std::shared_ptr<InferenceEngine::ExecutorManager> create_old_manager(
const std::shared_ptr<ov::ExecutorManager>& manager);
const std::shared_ptr<ov::threading::ExecutorManager>& manager);
} // namespace InferenceEngine

View File

@@ -14,6 +14,7 @@
#include <vector>
#include "ie_parameter.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
namespace InferenceEngine {
@@ -28,30 +29,17 @@ namespace InferenceEngine {
* @par NUMA
* On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream
*/
class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor {
class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor, public ov::threading::IStreamsExecutor {
public:
/**
* A shared pointer to IStreamsExecutor interface
*/
using Ptr = std::shared_ptr<IStreamsExecutor>;
/**
* @brief Defines inference thread binding type
*/
enum ThreadBindingType : std::uint8_t {
NONE, //!< Don't bind the inference threads
CORES, //!< Bind inference threads to the CPU cores (round-robin)
// the following modes are implemented only for the TBB code-path:
NUMA, //!< Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the 'CORES' is
//!< not implemeneted)
HYBRID_AWARE //!< Let the runtime bind the inference threads depending on the cores type (default mode for the
//!< hybrid CPUs)
};
/**
* @brief Defines IStreamsExecutor configuration
*/
struct INFERENCE_ENGINE_API_CLASS(Config) {
struct INFERENCE_ENGINE_API_CLASS(Config) : public ov::threading::IStreamsExecutor::Config {
/**
* @brief Supported Configuration keys
* @return vector of supported configuration keys
@@ -87,33 +75,6 @@ public:
static int GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode);
static void UpdateHybridCustomThreads(Config& config);
std::string _name; //!< Used by `ITT` to name executor threads
int _streams = 1; //!< Number of streams.
int _threadsPerStream = 0; //!< Number of threads per stream that executes `ie_parallel` calls
ThreadBindingType _threadBindingType = ThreadBindingType::NONE; //!< Thread binding to hardware resource type.
//!< No binding by default
int _threadBindingStep = 1; //!< In case of @ref CORES binding offset type
//!< thread binded to cores with defined step
int _threadBindingOffset = 0; //!< In case of @ref CORES binding offset type thread binded to cores
//!< starting from offset
int _threads = 0; //!< Number of threads distributed between streams.
//!< Reserved. Should not be used.
int _big_core_streams = 0; //!< Number of streams in Performance-core(big core)
int _small_core_streams = 0; //!< Number of streams in Efficient-core(small core)
int _threads_per_stream_big = 0; //!< Threads per stream in big cores
int _threads_per_stream_small = 0; //!< Threads per stream in small cores
int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores
bool _enable_hyper_thread = true; //!< enable hyper thread
enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE };
enum PreferredCoreType {
ANY,
LITTLE,
BIG,
ROUND_ROBIN // used w/multiple streams to populate the Big cores first, then the Little, then wrap around
// (for large #streams)
} _threadPreferredCoreType =
PreferredCoreType::ANY; //!< In case of @ref HYBRID_AWARE hints the TBB to affinitize
/**
* @brief A constructor with arguments
*
@@ -134,14 +95,17 @@ public:
int threadBindingOffset = 0,
int threads = 0,
PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY)
: _name{name},
_streams{streams},
_threadsPerStream{threadsPerStream},
_threadBindingType{threadBindingType},
_threadBindingStep{threadBindingStep},
_threadBindingOffset{threadBindingOffset},
_threads{threads},
_threadPreferredCoreType(threadPreferredCoreType) {}
: ov::threading::IStreamsExecutor::Config(name,
streams,
threadsPerStream,
threadBindingType,
threadBindingStep,
threadBindingOffset,
threads,
threadPreferredCoreType) {}
Config(const ov::threading::IStreamsExecutor::Config& config)
: ov::threading::IStreamsExecutor::Config(config) {}
};
/**
@@ -166,6 +130,18 @@ public:
* @param task A task to start
*/
virtual void Execute(Task task) = 0;
int get_stream_id() override {
return GetStreamId();
}
int get_numa_node_id() override {
return GetNumaNodeId();
}
void execute(Task task) override {
Execute(task);
}
};
} // namespace InferenceEngine

View File

@@ -14,6 +14,7 @@
#include <vector>
#include "ie_api.h"
#include "openvino/runtime/threading/itask_executor.hpp"
namespace InferenceEngine {
@@ -22,7 +23,7 @@ namespace InferenceEngine {
* It would be wrapped into std::function object
* @ingroup ie_dev_api_threading
*/
using Task = std::function<void()>;
using Task = ov::threading::Task;
/**
* @interface ITaskExecutor
@@ -36,14 +37,13 @@ concurrency.
* - Immediate task executor can be used to satisfy `InferenceEngine::ITaskExecutor` interface restrictions but
run tasks in current thread.
* @note Implementation should guaranty thread safety of all methods
* @section Synchronization
* It is `InferenceEngine::ITaskExecutor` user responsibility to wait for task execution completion.
* The `c++11` standard way to wait task completion is to use `std::packaged_task` or `std::promise` with
`std::future`.
* Here is an example of how to use `std::promise` to wait task completion and process task's exceptions:
* @snippet example_itask_executor.cpp itask_executor:define_pipeline
*/
class INFERENCE_ENGINE_API_CLASS(ITaskExecutor) {
class INFERENCE_ENGINE_API_CLASS(ITaskExecutor) : virtual public ov::threading::ITaskExecutor {
public:
/**
* A shared pointer to ITaskExecutor interface
@@ -55,12 +55,6 @@ public:
*/
virtual ~ITaskExecutor() = default;
/**
* @brief Execute InferenceEngine::Task inside task executor context
* @param task A task to start
*/
virtual void run(Task task) = 0;
/**
* @brief Execute all of the tasks and waits for its completion.
* Default runAndWait() method implementation uses run() pure virtual method

View File

@@ -58,7 +58,7 @@ void stripDeviceName(std::string& device, const std::string& substr) {
ov::CoreImpl::CoreImpl(bool _newAPI) : m_new_api(_newAPI) {
add_mutex(""); // Register global mutex
m_executor_manager = ov::executor_manager();
m_executor_manager = ov::threading::executor_manager();
for (const auto& it : ov::get_available_opsets()) {
opsetNames.insert(it.first);
}
@@ -633,7 +633,7 @@ void ov::CoreImpl::set_property(const std::string& device_name, const AnyMap& pr
ov::Any ov::CoreImpl::get_property_for_core(const std::string& name) const {
if (name == ov::force_tbb_terminate.name()) {
const auto flag = ov::executor_manager()->get_property(name).as<bool>();
const auto flag = ov::threading::executor_manager()->get_property(name).as<bool>();
return decltype(ov::force_tbb_terminate)::value_type(flag);
} else if (name == ov::cache_dir.name()) {
return ov::Any(coreConfig.get_cache_dir());
@@ -994,7 +994,7 @@ void ov::CoreImpl::CoreConfig::set_and_update(ov::AnyMap& config) {
it = config.find(ov::force_tbb_terminate.name());
if (it != config.end()) {
auto flag = it->second.as<std::string>() == CONFIG_VALUE(YES) ? true : false;
ov::executor_manager()->set_property({{it->first, flag}});
ov::threading::executor_manager()->set_property({{it->first, flag}});
config.erase(it);
}

View File

@@ -162,7 +162,7 @@ private:
}
};
std::shared_ptr<ov::ExecutorManager> m_executor_manager;
std::shared_ptr<ov::threading::ExecutorManager> m_executor_manager;
mutable std::unordered_set<std::string> opsetNames;
// TODO: make extensions to be optional with conditional compilation
mutable std::vector<InferenceEngine::IExtensionPtr> extensions;

View File

@@ -14,13 +14,13 @@
namespace {
struct ImmediateStreamsExecutor : public InferenceEngine::ITaskExecutor {
explicit ImmediateStreamsExecutor(const InferenceEngine::IStreamsExecutor::Ptr& streamsExecutor)
struct ImmediateStreamsExecutor : public ov::threading::ITaskExecutor {
explicit ImmediateStreamsExecutor(const std::shared_ptr<ov::threading::IStreamsExecutor>& streamsExecutor)
: _streamsExecutor{streamsExecutor} {}
void run(InferenceEngine::Task task) override {
_streamsExecutor->Execute(std::move(task));
_streamsExecutor->execute(std::move(task));
}
InferenceEngine::IStreamsExecutor::Ptr _streamsExecutor;
std::shared_ptr<ov::threading::IStreamsExecutor> _streamsExecutor;
};
} // namespace
@@ -30,8 +30,8 @@ ov::IAsyncInferRequest::~IAsyncInferRequest() {
}
ov::IAsyncInferRequest::IAsyncInferRequest(const std::shared_ptr<IInferRequest>& request,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const InferenceEngine::ITaskExecutor::Ptr& callback_executor)
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& callback_executor)
: m_sync_request(request),
m_request_executor(task_executor),
m_callback_executor(callback_executor) {
@@ -117,7 +117,7 @@ void ov::IAsyncInferRequest::start_async_thread_unsafe() {
void ov::IAsyncInferRequest::run_first_stage(const Pipeline::iterator itBeginStage,
const Pipeline::iterator itEndStage,
const InferenceEngine::ITaskExecutor::Ptr callbackExecutor) {
const std::shared_ptr<ov::threading::ITaskExecutor> callbackExecutor) {
auto& firstStageExecutor = std::get<Stage_e::EXECUTOR>(*itBeginStage);
OPENVINO_ASSERT(nullptr != firstStageExecutor);
firstStageExecutor->run(make_next_stage_task(itBeginStage, itEndStage, std::move(callbackExecutor)));
@@ -126,9 +126,9 @@ void ov::IAsyncInferRequest::run_first_stage(const Pipeline::iterator itBeginSta
InferenceEngine::Task ov::IAsyncInferRequest::make_next_stage_task(
const Pipeline::iterator itStage,
const Pipeline::iterator itEndStage,
const InferenceEngine::ITaskExecutor::Ptr callbackExecutor) {
const std::shared_ptr<ov::threading::ITaskExecutor> callbackExecutor) {
return std::bind(
[this, itStage, itEndStage](InferenceEngine::ITaskExecutor::Ptr& callbackExecutor) mutable {
[this, itStage, itEndStage](std::shared_ptr<ov::threading::ITaskExecutor>& callbackExecutor) mutable {
std::exception_ptr currentException = nullptr;
auto& thisStage = *itStage;
auto itNextStage = itStage + 1;

View File

@@ -11,8 +11,8 @@
ov::ICompiledModel::ICompiledModel(const std::shared_ptr<const ov::Model>& model,
const std::shared_ptr<const ov::IPlugin>& plugin,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const InferenceEngine::ITaskExecutor::Ptr& callback_executor)
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& callback_executor)
: m_plugin(plugin),
m_task_executor(task_executor),
m_callback_executor(callback_executor) {
@@ -86,10 +86,10 @@ std::shared_ptr<ov::IAsyncInferRequest> ov::ICompiledModel::create_infer_request
const std::shared_ptr<const ov::IPlugin>& ov::ICompiledModel::get_plugin() const {
return m_plugin;
}
const InferenceEngine::ITaskExecutor::Ptr ov::ICompiledModel::get_task_executor() const {
const std::shared_ptr<ov::threading::ITaskExecutor> ov::ICompiledModel::get_task_executor() const {
return m_task_executor;
}
const InferenceEngine::ITaskExecutor::Ptr ov::ICompiledModel::get_callback_executor() const {
const std::shared_ptr<ov::threading::ITaskExecutor> ov::ICompiledModel::get_callback_executor() const {
return m_callback_executor;
}

View File

@@ -4,9 +4,8 @@
#include "icompiled_model_wrapper.hpp"
#include <ie_plugin_config.hpp>
#include "dev/converter_utils.hpp"
#include "ie_plugin_config.hpp"
InferenceEngine::ICompiledModelWrapper::ICompiledModelWrapper(
const std::shared_ptr<InferenceEngine::IExecutableNetworkInternal>& model)

View File

@@ -4,7 +4,7 @@
#include "openvino/runtime/iplugin.hpp"
ov::IPlugin::IPlugin() : m_executor_manager(ov::executor_manager()), m_is_new_api(true) {}
ov::IPlugin::IPlugin() : m_executor_manager(ov::threading::executor_manager()), m_is_new_api(true) {}
void ov::IPlugin::set_version(const ov::Version& version) {
m_version = version;
@@ -42,7 +42,7 @@ bool ov::IPlugin::is_new_api() const {
return m_is_new_api;
}
const std::shared_ptr<ov::ExecutorManager>& ov::IPlugin::get_executor_manager() const {
const std::shared_ptr<ov::threading::ExecutorManager>& ov::IPlugin::get_executor_manager() const {
return m_executor_manager;
}

View File

@@ -7,6 +7,7 @@
#include <unordered_map>
#include "cpp_interfaces/plugin_itt.hpp"
#include "ie_blob.h"
#include "openvino/core/except.hpp"
#include "openvino/core/layout.hpp"
#include "openvino/core/parallel.hpp"

View File

@@ -0,0 +1,397 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include "openvino/itt.hpp"
#include "openvino/runtime/system_conf.hpp"
#include "openvino/runtime/threading/executor_manager.hpp"
#include "threading/ie_parallel_custom_arena.hpp"
#include "threading/ie_thread_affinity.hpp"
#include "threading/ie_thread_local.hpp"
namespace ov {
namespace threading {
struct CPUStreamsExecutor::Impl {
struct Stream {
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
struct Observer : public custom::task_scheduler_observer {
InferenceEngine::CpuSet _mask;
int _ncpus = 0;
int _threadBindingStep = 0;
int _offset = 0;
int _cpuIdxOffset = 0;
Observer(custom::task_arena& arena,
InferenceEngine::CpuSet mask,
int ncpus,
const int streamId,
const int threadsPerStream,
const int threadBindingStep,
const int threadBindingOffset,
const int cpuIdxOffset = 0)
: custom::task_scheduler_observer(arena),
_mask{std::move(mask)},
_ncpus(ncpus),
_threadBindingStep(threadBindingStep),
_offset{streamId * threadsPerStream + threadBindingOffset},
_cpuIdxOffset(cpuIdxOffset) {}
void on_scheduler_entry(bool) override {
InferenceEngine::PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(),
_threadBindingStep,
_ncpus,
_mask,
_cpuIdxOffset);
}
void on_scheduler_exit(bool) override {
PinCurrentThreadByMask(_ncpus, _mask);
}
~Observer() override = default;
};
#endif
explicit Stream(Impl* impl) : _impl(impl) {
{
std::lock_guard<std::mutex> lock{_impl->_streamIdMutex};
if (_impl->_streamIdQueue.empty()) {
_streamId = _impl->_streamId++;
} else {
_streamId = _impl->_streamIdQueue.front();
_impl->_streamIdQueue.pop();
}
}
_numaNodeId = _impl->_config._streams
? _impl->_usedNumaNodes.at((_streamId % _impl->_config._streams) /
((_impl->_config._streams + _impl->_usedNumaNodes.size() - 1) /
_impl->_usedNumaNodes.size()))
: _impl->_usedNumaNodes.at(_streamId % _impl->_usedNumaNodes.size());
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
const auto concurrency = (0 == _impl->_config._threadsPerStream) ? custom::task_arena::automatic
: _impl->_config._threadsPerStream;
if (ThreadBindingType::HYBRID_AWARE == _impl->_config._threadBindingType) {
if (Config::PreferredCoreType::ROUND_ROBIN != _impl->_config._threadPreferredCoreType) {
if (Config::PreferredCoreType::ANY == _impl->_config._threadPreferredCoreType) {
_taskArena.reset(new custom::task_arena{concurrency});
} else {
const auto selected_core_type =
Config::PreferredCoreType::BIG == _impl->_config._threadPreferredCoreType
? custom::info::core_types().back() // running on Big cores only
: custom::info::core_types().front(); // running on Little cores only
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{}
.set_core_type(selected_core_type)
.set_max_concurrency(concurrency)});
}
} else {
// assigning the stream to the core type in the round-robin fashion
// wrapping around total_streams (i.e. how many streams all different core types can handle
// together). Binding priority: Big core, Logical big core, Small core
const auto total_streams = _impl->total_streams_on_core_types.back().second;
const auto big_core_streams = _impl->total_streams_on_core_types.front().second;
const auto hybrid_core = _impl->total_streams_on_core_types.size() > 1;
const auto phy_core_streams =
_impl->_config._big_core_streams == 0
? 0
: _impl->num_big_core_phys / _impl->_config._threads_per_stream_big;
const auto streamId_wrapped = _streamId % total_streams;
const auto& selected_core_type =
std::find_if(
_impl->total_streams_on_core_types.cbegin(),
_impl->total_streams_on_core_types.cend(),
[streamId_wrapped](const decltype(_impl->total_streams_on_core_types)::value_type& p) {
return p.second > streamId_wrapped;
})
->first;
const auto small_core = hybrid_core && selected_core_type == 0;
const auto logic_core = !small_core && streamId_wrapped >= phy_core_streams;
const auto small_core_skip = small_core && _impl->_config._threads_per_stream_small == 3 &&
_impl->_config._small_core_streams > 1;
const auto max_concurrency =
small_core ? _impl->_config._threads_per_stream_small : _impl->_config._threads_per_stream_big;
// Special handling of _threads_per_stream_small == 3
const auto small_core_id = small_core_skip ? 0 : streamId_wrapped - big_core_streams;
const auto stream_id =
hybrid_core
? (small_core ? small_core_id
: (logic_core ? streamId_wrapped - phy_core_streams : streamId_wrapped))
: streamId_wrapped;
const auto thread_binding_step = hybrid_core ? (small_core ? _impl->_config._threadBindingStep : 2)
: _impl->_config._threadBindingStep;
// Special handling of _threads_per_stream_small == 3, need to skip 4 (Four cores share one L2 cache
// on the small core), stream_id = 0, cpu_idx_offset cumulative plus 4
const auto small_core_offset =
small_core_skip ? _impl->_config._small_core_offset + (streamId_wrapped - big_core_streams) * 4
: _impl->_config._small_core_offset;
const auto cpu_idx_offset =
hybrid_core
// Prevent conflicts with system scheduling, so default cpu id on big core starts from 1
? (small_core ? small_core_offset : (logic_core ? 0 : 1))
: 0;
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{}
.set_core_type(selected_core_type)
.set_max_concurrency(max_concurrency)});
InferenceEngine::CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask();
if (nullptr != processMask) {
_observer.reset(new Observer{*_taskArena,
std::move(processMask),
ncpus,
stream_id,
max_concurrency,
thread_binding_step,
_impl->_config._threadBindingOffset,
cpu_idx_offset});
_observer->observe(true);
}
}
} else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{_numaNodeId, concurrency}});
} else if ((0 != _impl->_config._threadsPerStream) ||
(ThreadBindingType::CORES == _impl->_config._threadBindingType)) {
_taskArena.reset(new custom::task_arena{concurrency});
if (ThreadBindingType::CORES == _impl->_config._threadBindingType) {
InferenceEngine::CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask();
if (nullptr != processMask) {
_observer.reset(new Observer{*_taskArena,
std::move(processMask),
ncpus,
_streamId,
_impl->_config._threadsPerStream,
_impl->_config._threadBindingStep,
_impl->_config._threadBindingOffset});
_observer->observe(true);
}
}
}
#elif OV_THREAD == OV_THREAD_OMP
omp_set_num_threads(_impl->_config._threadsPerStream);
if (!checkOpenMpEnvVars(false) && (ThreadBindingType::NONE != _impl->_config._threadBindingType)) {
InferenceEngine::CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask();
if (nullptr != processMask) {
parallel_nt(_impl->_config._threadsPerStream, [&](int threadIndex, int threadsPerStream) {
int thrIdx = _streamId * _impl->_config._threadsPerStream + threadIndex +
_impl->_config._threadBindingOffset;
InferenceEngine::PinThreadToVacantCore(thrIdx,
_impl->_config._threadBindingStep,
ncpus,
processMask);
});
}
}
#elif OV_THREAD == OV_THREAD_SEQ
if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
InferenceEngine::PinCurrentThreadToSocket(_numaNodeId);
} else if (ThreadBindingType::CORES == _impl->_config._threadBindingType) {
InferenceEngine::CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = InferenceEngine::GetProcessMask();
if (nullptr != processMask) {
InferenceEngine::PinThreadToVacantCore(_streamId + _impl->_config._threadBindingOffset,
_impl->_config._threadBindingStep,
ncpus,
processMask);
}
}
#endif
}
~Stream() {
{
std::lock_guard<std::mutex> lock{_impl->_streamIdMutex};
_impl->_streamIdQueue.push(_streamId);
}
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
if (nullptr != _observer) {
_observer->observe(false);
}
#endif
}
Impl* _impl = nullptr;
int _streamId = 0;
int _numaNodeId = 0;
bool _execute = false;
std::queue<Task> _taskQueue;
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
std::unique_ptr<custom::task_arena> _taskArena;
std::unique_ptr<Observer> _observer;
#endif
};
explicit Impl(const Config& config)
: _config{config},
_streams([this] {
return std::make_shared<Impl::Stream>(this);
}) {
_exectorMgr = executor_manager();
auto numaNodes = get_available_numa_nodes();
if (_config._streams != 0) {
std::copy_n(std::begin(numaNodes),
std::min(static_cast<std::size_t>(_config._streams), numaNodes.size()),
std::back_inserter(_usedNumaNodes));
} else {
_usedNumaNodes = numaNodes;
}
#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO)
if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) {
const auto core_types = custom::info::core_types();
const auto num_core_phys = get_number_of_cpu_cores();
num_big_core_phys = get_number_of_cpu_cores(true);
const auto num_small_core_phys = num_core_phys - num_big_core_phys;
int sum = 0;
// reversed order, so BIG cores are first
for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) {
const auto& type = *iter;
// calculating the #streams per core type
const int num_streams_for_core_type =
type == 0 ? std::max(1,
std::min(config._small_core_streams,
config._threads_per_stream_small == 0
? 0
: num_small_core_phys / config._threads_per_stream_small))
: std::max(1,
std::min(config._big_core_streams,
config._threads_per_stream_big == 0
? 0
: num_big_core_phys / config._threads_per_stream_big * 2));
sum += num_streams_for_core_type;
// prefix sum, so the core type for a given stream id will be deduced just as a upper_bound
// (notice that the map keeps the elements in the descending order, so the big cores are populated
// first)
total_streams_on_core_types.push_back({type, sum});
}
}
#endif
for (auto streamId = 0; streamId < _config._streams; ++streamId) {
_threads.emplace_back([this, streamId] {
openvino::itt::threadName(_config._name + "_" + std::to_string(streamId));
for (bool stopped = false; !stopped;) {
Task task;
{
std::unique_lock<std::mutex> lock(_mutex);
_queueCondVar.wait(lock, [&] {
return !_taskQueue.empty() || (stopped = _isStopped);
});
if (!_taskQueue.empty()) {
task = std::move(_taskQueue.front());
_taskQueue.pop();
}
}
if (task) {
Execute(task, *(_streams.local()));
}
}
});
}
}
void Enqueue(Task task) {
{
std::lock_guard<std::mutex> lock(_mutex);
_taskQueue.emplace(std::move(task));
}
_queueCondVar.notify_one();
}
void Execute(const Task& task, Stream& stream) {
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
auto& arena = stream._taskArena;
if (nullptr != arena) {
arena->execute(std::move(task));
} else {
task();
}
#else
task();
#endif
}
void Defer(Task task) {
auto& stream = *(_streams.local());
stream._taskQueue.push(std::move(task));
if (!stream._execute) {
stream._execute = true;
try {
while (!stream._taskQueue.empty()) {
Execute(stream._taskQueue.front(), stream);
stream._taskQueue.pop();
}
} catch (...) {
}
stream._execute = false;
}
}
Config _config;
std::mutex _streamIdMutex;
int _streamId = 0;
std::queue<int> _streamIdQueue;
std::vector<std::thread> _threads;
std::mutex _mutex;
std::condition_variable _queueCondVar;
std::queue<Task> _taskQueue;
bool _isStopped = false;
std::vector<int> _usedNumaNodes;
InferenceEngine::ThreadLocal<std::shared_ptr<Stream>> _streams;
#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO)
// stream id mapping to the core type
// stored in the reversed order (so the big cores, with the highest core_type_id value, are populated first)
// every entry is the core type and #streams that this AND ALL EARLIER entries can handle (prefix sum)
// (so mapping is actually just an upper_bound: core type is deduced from the entry for which the id < #streams)
using StreamIdToCoreTypes = std::vector<std::pair<custom::core_type_id, int>>;
StreamIdToCoreTypes total_streams_on_core_types;
int num_big_core_phys;
#endif
std::shared_ptr<ExecutorManager> _exectorMgr;
};
int CPUStreamsExecutor::get_stream_id() {
auto stream = _impl->_streams.local();
return stream->_streamId;
}
int CPUStreamsExecutor::get_numa_node_id() {
auto stream = _impl->_streams.local();
return stream->_numaNodeId;
}
CPUStreamsExecutor::CPUStreamsExecutor(const ov::threading::IStreamsExecutor::Config& config)
: _impl{new Impl{config}} {}
CPUStreamsExecutor::~CPUStreamsExecutor() {
{
std::lock_guard<std::mutex> lock(_impl->_mutex);
_impl->_isStopped = true;
}
_impl->_queueCondVar.notify_all();
for (auto& thread : _impl->_threads) {
if (thread.joinable()) {
thread.join();
}
}
}
void CPUStreamsExecutor::execute(Task task) {
_impl->Defer(std::move(task));
}
void CPUStreamsExecutor::run(Task task) {
if (0 == _impl->_config._streams) {
_impl->Defer(std::move(task));
} else {
_impl->Enqueue(std::move(task));
}
}
} // namespace threading
} // namespace ov

View File

@@ -6,6 +6,7 @@
#include "openvino/core/parallel.hpp"
#include "openvino/runtime/properties.hpp"
#include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include "threading/ie_cpu_streams_executor.hpp"
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
# if (TBB_INTERFACE_VERSION < 12000)
@@ -21,13 +22,14 @@
#include <utility>
namespace ov {
namespace threading {
namespace {
class ExecutorManagerImpl : public ExecutorManager {
public:
~ExecutorManagerImpl();
InferenceEngine::ITaskExecutor::Ptr get_executor(const std::string& id) override;
InferenceEngine::IStreamsExecutor::Ptr get_idle_cpu_streams_executor(
const InferenceEngine::IStreamsExecutor::Config& config) override;
std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string& id) override;
std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(
const ov::threading::IStreamsExecutor::Config& config) override;
size_t get_executors_number() const override;
size_t get_idle_cpu_streams_executors_number() const override;
void clear(const std::string& id = {}) override;
@@ -37,8 +39,8 @@ public:
private:
void reset_tbb();
std::unordered_map<std::string, InferenceEngine::ITaskExecutor::Ptr> executors;
std::vector<std::pair<InferenceEngine::IStreamsExecutor::Config, InferenceEngine::IStreamsExecutor::Ptr>>
std::unordered_map<std::string, std::shared_ptr<ov::threading::ITaskExecutor>> executors;
std::vector<std::pair<ov::threading::IStreamsExecutor::Config, std::shared_ptr<ov::threading::IStreamsExecutor>>>
cpuStreamsExecutors;
mutable std::mutex streamExecutorMutex;
mutable std::mutex taskExecutorMutex;
@@ -110,12 +112,11 @@ void ExecutorManagerImpl::reset_tbb() {
}
}
InferenceEngine::ITaskExecutor::Ptr ExecutorManagerImpl::get_executor(const std::string& id) {
std::shared_ptr<ov::threading::ITaskExecutor> ExecutorManagerImpl::get_executor(const std::string& id) {
std::lock_guard<std::mutex> guard(taskExecutorMutex);
auto foundEntry = executors.find(id);
if (foundEntry == executors.end()) {
auto newExec =
std::make_shared<InferenceEngine::CPUStreamsExecutor>(InferenceEngine::IStreamsExecutor::Config{id});
auto newExec = std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{id});
tbbThreadsCreated = true;
executors[id] = newExec;
return newExec;
@@ -123,8 +124,8 @@ InferenceEngine::ITaskExecutor::Ptr ExecutorManagerImpl::get_executor(const std:
return foundEntry->second;
}
InferenceEngine::IStreamsExecutor::Ptr ExecutorManagerImpl::get_idle_cpu_streams_executor(
const InferenceEngine::IStreamsExecutor::Config& config) {
std::shared_ptr<ov::threading::IStreamsExecutor> ExecutorManagerImpl::get_idle_cpu_streams_executor(
const ov::threading::IStreamsExecutor::Config& config) {
std::lock_guard<std::mutex> guard(streamExecutorMutex);
for (const auto& it : cpuStreamsExecutors) {
const auto& executor = it.second;
@@ -137,12 +138,11 @@ InferenceEngine::IStreamsExecutor::Ptr ExecutorManagerImpl::get_idle_cpu_streams
executorConfig._threadBindingType == config._threadBindingType &&
executorConfig._threadBindingStep == config._threadBindingStep &&
executorConfig._threadBindingOffset == config._threadBindingOffset)
if (executorConfig._threadBindingType !=
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE ||
if (executorConfig._threadBindingType != ov::threading::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE ||
executorConfig._threadPreferredCoreType == config._threadPreferredCoreType)
return executor;
}
auto newExec = std::make_shared<InferenceEngine::CPUStreamsExecutor>(config);
auto newExec = std::make_shared<ov::threading::CPUStreamsExecutor>(config);
tbbThreadsCreated = true;
cpuStreamsExecutors.emplace_back(std::make_pair(config, newExec));
return newExec;
@@ -166,13 +166,14 @@ void ExecutorManagerImpl::clear(const std::string& id) {
cpuStreamsExecutors.clear();
} else {
executors.erase(id);
cpuStreamsExecutors.erase(std::remove_if(cpuStreamsExecutors.begin(),
cpuStreamsExecutors.end(),
[&](const std::pair<InferenceEngine::IStreamsExecutor::Config,
InferenceEngine::IStreamsExecutor::Ptr>& it) {
return it.first._name == id;
}),
cpuStreamsExecutors.end());
cpuStreamsExecutors.erase(
std::remove_if(cpuStreamsExecutors.begin(),
cpuStreamsExecutors.end(),
[&](const std::pair<ov::threading::IStreamsExecutor::Config,
std::shared_ptr<ov::threading::IStreamsExecutor>>& it) {
return it.first._name == id;
}),
cpuStreamsExecutors.end());
}
}
@@ -188,7 +189,7 @@ public:
ExecutorManagerHolder() = default;
std::shared_ptr<ov::ExecutorManager> get() {
std::shared_ptr<ov::threading::ExecutorManager> get() {
std::lock_guard<std::mutex> lock(_mutex);
auto manager = _manager.lock();
if (!manager) {
@@ -205,4 +206,5 @@ std::shared_ptr<ExecutorManager> executor_manager() {
return executorManagerHolder.get();
}
} // namespace threading
} // namespace ov

View File

@@ -0,0 +1,496 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "openvino/runtime/threading/istreams_executor.hpp"
#include <algorithm>
#include <string>
#include <thread>
#include <vector>
#include "cpp_interfaces/interface/ie_internal_plugin_config.hpp"
#include "ie_plugin_config.hpp"
#include "openvino/core/parallel.hpp"
#include "openvino/runtime/properties.hpp"
#include "openvino/runtime/system_conf.hpp"
#include "openvino/util/log.hpp"
#include "threading/ie_parallel_custom_arena.hpp"
namespace ov {
namespace threading {
IStreamsExecutor::~IStreamsExecutor() {}
void IStreamsExecutor::Config::set_property(const std::string& key, const ov::Any& value) {
set_property({{key, value}});
}
void IStreamsExecutor::Config::set_property(const ov::AnyMap& property) {
for (const auto& it : property) {
const auto& key = it.first;
const auto value = it.second;
if (key == CONFIG_KEY(CPU_BIND_THREAD)) {
if (value.as<std::string>() == CONFIG_VALUE(YES) || value.as<std::string>() == CONFIG_VALUE(NUMA)) {
#if (defined(__APPLE__) || defined(_WIN32))
_threadBindingType = IStreamsExecutor::ThreadBindingType::NUMA;
#else
_threadBindingType = (value.as<std::string>() == CONFIG_VALUE(YES))
? IStreamsExecutor::ThreadBindingType::CORES
: IStreamsExecutor::ThreadBindingType::NUMA;
#endif
} else if (value.as<std::string>() == CONFIG_VALUE(HYBRID_AWARE)) {
_threadBindingType = IStreamsExecutor::ThreadBindingType::HYBRID_AWARE;
} else if (value.as<std::string>() == CONFIG_VALUE(NO)) {
_threadBindingType = IStreamsExecutor::ThreadBindingType::NONE;
} else {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_BIND_THREAD)
<< ". Expected only YES(binds to cores) / NO(no binding) / NUMA(binds to NUMA nodes) / "
"HYBRID_AWARE (let the runtime recognize and use the hybrid cores)";
}
} else if (key == ov::affinity) {
ov::Affinity affinity;
std::stringstream{value.as<std::string>()} >> affinity;
switch (affinity) {
case ov::Affinity::NONE:
_threadBindingType = ThreadBindingType::NONE;
break;
case ov::Affinity::CORE: {
#if (defined(__APPLE__) || defined(_WIN32))
_threadBindingType = ThreadBindingType::NUMA;
#else
_threadBindingType = ThreadBindingType::CORES;
#endif
} break;
case ov::Affinity::NUMA:
_threadBindingType = ThreadBindingType::NUMA;
break;
case ov::Affinity::HYBRID_AWARE:
_threadBindingType = ThreadBindingType::HYBRID_AWARE;
break;
default:
OPENVINO_UNREACHABLE("Unsupported affinity type");
}
} else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) {
if (value.as<std::string>() == CONFIG_VALUE(CPU_THROUGHPUT_NUMA)) {
_streams = static_cast<int>(get_available_numa_nodes().size());
} else if (value.as<std::string>() == CONFIG_VALUE(CPU_THROUGHPUT_AUTO)) {
// bare minimum of streams (that evenly divides available number of cores)
_streams = get_default_num_streams();
} else {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS)
<< ". Expected only positive numbers (#streams) or "
<< "PluginConfigParams::CPU_THROUGHPUT_NUMA/CPU_THROUGHPUT_AUTO";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS)
<< ". Expected only positive numbers (#streams)";
}
_streams = val_i;
}
} else if (key == ov::num_streams) {
auto streams = value.as<ov::streams::Num>();
if (streams == ov::streams::NUMA) {
_streams = static_cast<int32_t>(get_available_numa_nodes().size());
} else if (streams == ov::streams::AUTO) {
// bare minimum of streams (that evenly divides available number of cores)
_streams = get_default_num_streams();
} else if (streams.num >= 0) {
_streams = streams.num;
} else {
OPENVINO_UNREACHABLE("Wrong value for property key ",
ov::num_streams.name(),
". Expected non negative numbers (#streams) or ",
"ov::streams::NUMA|ov::streams::AUTO, Got: ",
streams);
}
} else if (key == CONFIG_KEY(CPU_THREADS_NUM) || key == ov::inference_num_threads) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM)
<< ". Expected only positive numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM)
<< ". Expected only positive numbers (#threads)";
}
_threads = val_i;
} else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)
<< ". Expected only non negative numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)
<< ". Expected only non negative numbers (#threads)";
}
_threadsPerStream = val_i;
} else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
_big_core_streams = val_i;
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
_small_core_streams = val_i;
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)
<< ". Expected only non negative numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)
<< ". Expected only non negative numbers (#threads)";
}
_threads_per_stream_big = val_i;
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)
<< ". Expected only non negative numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)
<< ". Expected only non negative numbers (#threads)";
}
_threads_per_stream_small = val_i;
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
int val_i;
try {
val_i = value.as<int>();
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)
<< ". Expected only non negative numbers";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)
<< ". Expected only non negative numbers";
}
_small_core_offset = val_i;
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
if (value.as<std::string>() == CONFIG_VALUE(YES)) {
_enable_hyper_thread = true;
} else if (value.as<std::string>() == CONFIG_VALUE(NO)) {
_enable_hyper_thread = false;
} else {
OPENVINO_UNREACHABLE("Unsupported enable hyper thread type");
}
} else {
IE_THROW() << "Wrong value for property key " << key;
}
}
}
ov::Any IStreamsExecutor::Config::get_property(const std::string& key) const {
if (key == ov::supported_properties) {
std::vector<std::string> properties{
CONFIG_KEY(CPU_THROUGHPUT_STREAMS),
CONFIG_KEY(CPU_BIND_THREAD),
CONFIG_KEY(CPU_THREADS_NUM),
CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM),
CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS),
CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS),
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG),
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL),
CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET),
CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD),
ov::num_streams.name(),
ov::inference_num_threads.name(),
ov::affinity.name(),
};
return properties;
} else if (key == ov::affinity) {
switch (_threadBindingType) {
case IStreamsExecutor::ThreadBindingType::NONE:
return ov::Affinity::NONE;
case IStreamsExecutor::ThreadBindingType::CORES:
return ov::Affinity::CORE;
case IStreamsExecutor::ThreadBindingType::NUMA:
return ov::Affinity::NUMA;
case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE:
return ov::Affinity::HYBRID_AWARE;
}
} else if (key == CONFIG_KEY(CPU_BIND_THREAD)) {
switch (_threadBindingType) {
case IStreamsExecutor::ThreadBindingType::NONE:
return {CONFIG_VALUE(NO)};
case IStreamsExecutor::ThreadBindingType::CORES:
return {CONFIG_VALUE(YES)};
case IStreamsExecutor::ThreadBindingType::NUMA:
return {CONFIG_VALUE(NUMA)};
case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE:
return {CONFIG_VALUE(HYBRID_AWARE)};
}
} else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) {
return {std::to_string(_streams)};
} else if (key == ov::num_streams) {
return decltype(ov::num_streams)::value_type{_streams};
} else if (key == CONFIG_KEY(CPU_THREADS_NUM)) {
return {std::to_string(_threads)};
} else if (key == ov::inference_num_threads) {
return decltype(ov::inference_num_threads)::value_type{_threads};
} else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) {
return {std::to_string(_threadsPerStream)};
} else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) {
return {std::to_string(_big_core_streams)};
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) {
return {std::to_string(_small_core_streams)};
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) {
return {std::to_string(_threads_per_stream_big)};
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) {
return {std::to_string(_threads_per_stream_small)};
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
return {std::to_string(_small_core_offset)};
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
return {_enable_hyper_thread ? CONFIG_VALUE(YES) : CONFIG_VALUE(NO)};
} else {
OPENVINO_UNREACHABLE("Wrong value for property key ", key);
}
return {};
}
int IStreamsExecutor::Config::get_default_num_streams(const bool enable_hyper_thread) {
const int sockets = static_cast<int>(get_available_numa_nodes().size());
// bare minimum of streams (that evenly divides available number of core)
const int num_cores = sockets == 1 ? (enable_hyper_thread ? parallel_get_max_threads() : get_number_of_cpu_cores())
: get_number_of_cpu_cores();
if (0 == num_cores % 4)
return std::max(4, num_cores / 4);
else if (0 == num_cores % 5)
return std::max(5, num_cores / 5);
else if (0 == num_cores % 3)
return std::max(3, num_cores / 3);
else // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide
return 1;
}
int IStreamsExecutor::Config::get_hybrid_num_streams(std::map<std::string, std::string>& config,
const int stream_mode) {
const int num_cores = parallel_get_max_threads();
const int num_cores_phy = get_number_of_cpu_cores();
const int num_big_cores_phy = get_number_of_cpu_cores(true);
const int num_small_cores = num_cores_phy - num_big_cores_phy;
const int num_big_cores = num_cores > num_cores_phy ? num_big_cores_phy * 2 : num_big_cores_phy;
int big_core_streams = 0;
int small_core_streams = 0;
int threads_per_stream_big = 0;
int threads_per_stream_small = 0;
if (stream_mode == DEFAULT) {
// bare minimum of streams (that evenly divides available number of core)
if (0 == num_big_cores_phy % 4) {
threads_per_stream_big = 4;
} else if (0 == num_big_cores_phy % 5) {
threads_per_stream_big = 5;
} else if (0 == num_big_cores_phy % 3) {
threads_per_stream_big = 3;
} else { // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide
threads_per_stream_big = num_big_cores_phy;
}
big_core_streams = num_big_cores / threads_per_stream_big;
threads_per_stream_small = threads_per_stream_big;
if (num_small_cores == 0) {
threads_per_stream_small = 0;
} else if (num_small_cores < threads_per_stream_small) {
small_core_streams = 1;
threads_per_stream_small = num_small_cores;
threads_per_stream_big = threads_per_stream_small;
// Balance the computation of physical core and logical core, the number of threads on the physical core and
// logical core should be equal
big_core_streams = num_big_cores_phy / threads_per_stream_big * 2;
} else {
small_core_streams = num_small_cores / threads_per_stream_small;
}
} else if (stream_mode == AGGRESSIVE) {
big_core_streams = num_big_cores;
small_core_streams = num_small_cores;
threads_per_stream_big = num_big_cores / big_core_streams;
threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams;
} else if (stream_mode == LESSAGGRESSIVE) {
big_core_streams = num_big_cores / 2;
small_core_streams = num_small_cores / 2;
threads_per_stream_big = num_big_cores / big_core_streams;
threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams;
} else {
IE_THROW() << "Wrong stream mode to get num of streams: " << stream_mode;
}
config[CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)] = std::to_string(big_core_streams);
config[CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)] = std::to_string(small_core_streams);
config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)] = std::to_string(threads_per_stream_big);
config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)] = std::to_string(threads_per_stream_small);
// This is default setting for specific CPU which Pcore is in front and Ecore is in the back.
config[CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)] = std::to_string(num_small_cores == 0 ? 0 : num_big_cores);
return big_core_streams + small_core_streams;
}
void IStreamsExecutor::Config::update_hybrid_custom_threads(Config& config) {
const auto num_cores = parallel_get_max_threads();
const auto num_cores_phys = get_number_of_cpu_cores();
const auto num_big_cores_phys = get_number_of_cpu_cores(true);
const auto num_big_cores = num_cores > num_cores_phys ? num_big_cores_phys * 2 : num_big_cores_phys;
const auto num_small_cores_phys = num_cores_phys - num_big_cores_phys;
const auto threads = config._threads ? config._threads : num_cores;
const auto streams = config._streams > 0 ? config._streams : 1;
config._small_core_offset = num_big_cores;
int threads_per_stream = std::max(1, threads / streams);
if ((num_big_cores_phys / threads_per_stream >= streams) && (1 < threads_per_stream)) {
config._big_core_streams = streams;
config._threads_per_stream_big = threads_per_stream;
config._small_core_streams = 0;
config._threads_per_stream_small = 0;
} else if ((num_small_cores_phys / threads_per_stream >= streams) && (num_big_cores_phys < threads_per_stream)) {
config._big_core_streams = 0;
config._threads_per_stream_big = 0;
config._small_core_streams = streams;
config._threads_per_stream_small = threads_per_stream;
} else {
const int threads_per_stream_big = std::min(num_big_cores_phys, threads_per_stream);
const int threads_per_stream_small = std::min(num_small_cores_phys, threads_per_stream);
threads_per_stream = std::min(threads_per_stream_big, threads_per_stream_small);
while (threads_per_stream > 1) {
const int base_big_streams = num_big_cores_phys / threads_per_stream;
const int base_small_streams = num_small_cores_phys > 0 ? num_small_cores_phys / threads_per_stream : 0;
if (base_big_streams + base_small_streams >= streams) {
config._big_core_streams = base_big_streams;
config._small_core_streams = streams - base_big_streams;
break;
} else if (base_big_streams * 2 + base_small_streams >= streams) {
config._big_core_streams = streams - base_small_streams;
config._small_core_streams = base_small_streams;
break;
} else {
threads_per_stream = threads_per_stream > 1 ? threads_per_stream - 1 : 1;
}
}
if (threads_per_stream == 1) {
const int stream_loops = streams / num_cores;
const int remain_streams = streams - stream_loops * num_cores;
if (num_big_cores_phys >= remain_streams) {
config._big_core_streams = remain_streams + num_big_cores * stream_loops;
config._small_core_streams = num_small_cores_phys * stream_loops;
} else if (num_big_cores_phys + num_small_cores_phys >= remain_streams) {
config._big_core_streams = num_big_cores_phys + num_big_cores * stream_loops;
config._small_core_streams = remain_streams - num_big_cores_phys + num_small_cores_phys * stream_loops;
} else {
config._big_core_streams = remain_streams - num_small_cores_phys + num_big_cores * stream_loops;
config._small_core_streams = num_small_cores_phys * (stream_loops + 1);
}
}
config._threads_per_stream_big = threads_per_stream;
config._threads_per_stream_small = threads_per_stream;
}
}
IStreamsExecutor::Config IStreamsExecutor::Config::make_default_multi_threaded(const IStreamsExecutor::Config& initial,
const bool fp_intesive) {
const auto envThreads = parallel_get_env_threads();
const auto& numaNodes = get_available_numa_nodes();
const int numaNodesNum = static_cast<int>(numaNodes.size());
auto streamExecutorConfig = initial;
const bool bLatencyCase = streamExecutorConfig._streams <= numaNodesNum;
// by default, do not use the hyper-threading (to minimize threads synch overheads)
int num_cores_default = get_number_of_cpu_cores();
#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO)
// additional latency-case logic for hybrid processors:
if (ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType) {
const auto core_types = custom::info::core_types();
const auto num_little_cores =
custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.front()));
const auto num_big_cores_phys = get_number_of_cpu_cores(true);
const int int8_threshold = 4; // ~relative efficiency of the VNNI-intensive code for Big vs Little cores;
const int fp32_threshold = 2; // ~relative efficiency of the AVX2 fp32 code for Big vs Little cores;
// by default the latency case uses (faster) Big cores only, depending on the compute ratio
const bool bLatencyCaseBigOnly =
num_big_cores_phys > (num_little_cores / (fp_intesive ? fp32_threshold : int8_threshold));
// selecting the preferred core type
streamExecutorConfig._threadPreferredCoreType =
bLatencyCase ? (bLatencyCaseBigOnly ? IStreamsExecutor::Config::PreferredCoreType::BIG
: IStreamsExecutor::Config::PreferredCoreType::ANY)
: IStreamsExecutor::Config::PreferredCoreType::ROUND_ROBIN;
// additionally selecting the #cores to use in the "Big-only" case
if (bLatencyCaseBigOnly) {
const int hyper_threading_threshold =
2; // min #cores, for which the hyper-threading becomes useful for the latency case
const auto num_big_cores =
custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.back()));
num_cores_default = (num_big_cores_phys <= hyper_threading_threshold) ? num_big_cores : num_big_cores_phys;
}
// if nstreams or nthreads are set, need to calculate the Hybrid aware parameters here
if (!bLatencyCase && (streamExecutorConfig._big_core_streams == 0 || streamExecutorConfig._threads)) {
update_hybrid_custom_threads(streamExecutorConfig);
}
OPENVINO_DEBUG << "[ p_e_core_info ] streams (threads): " << streamExecutorConfig._streams << "("
<< streamExecutorConfig._threads_per_stream_big * streamExecutorConfig._big_core_streams +
streamExecutorConfig._threads_per_stream_small * streamExecutorConfig._small_core_streams
<< ") -- PCore: " << streamExecutorConfig._big_core_streams << "("
<< streamExecutorConfig._threads_per_stream_big
<< ") ECore: " << streamExecutorConfig._small_core_streams << "("
<< streamExecutorConfig._threads_per_stream_small << ")";
}
#endif
const auto hwCores =
!bLatencyCase && numaNodesNum == 1
// throughput case on a single-NUMA node machine uses all available cores
? (streamExecutorConfig._enable_hyper_thread ? parallel_get_max_threads() : num_cores_default)
// in the rest of cases:
// multi-node machine
// or
// latency case, single-node yet hybrid case that uses
// all core types
// or
// big-cores only, but the #cores is "enough" (pls see the logic above)
// it is usually beneficial not to use the hyper-threading (which is default)
: num_cores_default;
const auto threads =
streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores);
streamExecutorConfig._threadsPerStream =
streamExecutorConfig._streams ? std::max(1, threads / streamExecutorConfig._streams) : threads;
streamExecutorConfig._threads =
(!bLatencyCase && ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType)
? streamExecutorConfig._big_core_streams * streamExecutorConfig._threads_per_stream_big +
streamExecutorConfig._small_core_streams * streamExecutorConfig._threads_per_stream_small
: streamExecutorConfig._threadsPerStream * streamExecutorConfig._streams;
return streamExecutorConfig;
}
} // namespace threading
} // namespace ov

View File

@@ -0,0 +1,41 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "openvino/runtime/threading/itask_executor.hpp"
#include <future>
#include <memory>
#include <utility>
#include <vector>
namespace ov {
namespace threading {
void ITaskExecutor::run_and_wait(const std::vector<Task>& tasks) {
std::vector<std::packaged_task<void()>> packagedTasks;
std::vector<std::future<void>> futures;
for (std::size_t i = 0; i < tasks.size(); ++i) {
packagedTasks.emplace_back([&tasks, i] {
tasks[i]();
});
futures.emplace_back(packagedTasks.back().get_future());
}
for (std::size_t i = 0; i < tasks.size(); ++i) {
run([&packagedTasks, i] {
packagedTasks[i]();
});
}
// std::future::get will rethrow exception from task.
// We should wait all tasks before any exception is thrown.
// So wait() and get() for each future moved to separate loops
for (auto&& future : futures) {
future.wait();
}
for (auto&& future : futures) {
future.get();
}
}
} // namespace threading
} // namespace ov

View File

@@ -18,7 +18,7 @@
#include "streams_executor.hpp"
#include "threading/ie_parallel_custom_arena.hpp"
namespace InferenceEngine {
namespace ov {
struct CPU {
int _processors = 0;
@@ -243,13 +243,13 @@ void parse_processor_info_linux(const int _processors,
};
#if !((IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO))
std::vector<int> getAvailableNUMANodes() {
std::vector<int> get_available_numa_nodes() {
std::vector<int> nodes((0 == cpu._sockets) ? 1 : cpu._sockets);
std::iota(std::begin(nodes), std::end(nodes), 0);
return nodes;
}
#endif
int getNumberOfCPUCores(bool bigCoresOnly) {
int get_number_of_cpu_cores(bool bigCoresOnly) {
unsigned numberOfProcessors = cpu._processors;
unsigned totalNumberOfCpuCores = cpu._cores;
IE_ASSERT(totalNumberOfCpuCores != 0);
@@ -280,4 +280,4 @@ int getNumberOfCPUCores(bool bigCoresOnly) {
return phys_cores;
}
} // namespace InferenceEngine
} // namespace ov

View File

@@ -3,7 +3,7 @@
//
#ifndef NOMINMAX
# define NOMINMAX
# define NOMINMAX
#endif
#include <windows.h>
@@ -11,11 +11,11 @@
#include <memory>
#include <vector>
#include "ie_system_conf.h"
#include "openvino/runtime/system_conf.hpp"
#include "streams_executor.hpp"
#include "threading/ie_parallel_custom_arena.hpp"
namespace InferenceEngine {
namespace ov {
struct CPU {
int _processors = 0;
@@ -168,7 +168,7 @@ void parse_processor_info_win(const char* base_ptr,
}
}
int getNumberOfCPUCores(bool bigCoresOnly) {
int get_number_of_cpu_cores(bool bigCoresOnly) {
const int fallback_val = parallel_get_max_threads();
DWORD sz = 0;
// querying the size of the resulting structure, passing the nullptr for the buffer
@@ -178,7 +178,8 @@ int getNumberOfCPUCores(bool bigCoresOnly) {
std::unique_ptr<uint8_t[]> ptr(new uint8_t[sz]);
if (!GetLogicalProcessorInformationEx(RelationProcessorCore,
reinterpret_cast<PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX>(ptr.get()), &sz))
reinterpret_cast<PSYSTEM_LOGICAL_PROCESSOR_INFORMATION_EX>(ptr.get()),
&sz))
return fallback_val;
int phys_cores = 0;
@@ -188,20 +189,21 @@ int getNumberOfCPUCores(bool bigCoresOnly) {
phys_cores++;
} while (offset < sz);
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
auto core_types = custom::info::core_types();
if (bigCoresOnly && core_types.size() > 1) /*Hybrid CPU*/ {
phys_cores = custom::info::default_concurrency(custom::task_arena::constraints{}
.set_core_type(core_types.back())
.set_max_threads_per_core(1));
phys_cores = custom::info::default_concurrency(
custom::task_arena::constraints{}.set_core_type(core_types.back()).set_max_threads_per_core(1));
}
#endif
#endif
return phys_cores;
}
#if !(IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
// OMP/SEQ threading on the Windows doesn't support NUMA
std::vector<int> getAvailableNUMANodes() { return {-1}; }
std::vector<int> get_available_numa_nodes() {
return {-1};
}
#endif
} // namespace InferenceEngine
} // namespace ov

View File

@@ -11,7 +11,7 @@
#include <string>
#include <vector>
namespace InferenceEngine {
namespace ov {
#ifdef __linux__
/**
@@ -55,4 +55,4 @@ void parse_processor_info_win(const char* base_ptr,
std::vector<std::vector<int>>& _cpu_mapping_table);
#endif
} // namespace InferenceEngine
} // namespace ov

View File

@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
//
#include "ie_system_conf.h"
#include "openvino/runtime/system_conf.hpp"
#include <cstdlib>
#include <cstring>
@@ -15,7 +15,7 @@
#define XBYAK_UNDEF_JNL
#include <xbyak/xbyak_util.h>
namespace InferenceEngine {
namespace ov {
#if defined(OPENVINO_ARCH_X86) || defined(OPENVINO_ARCH_X86_64)
@@ -102,7 +102,7 @@ bool with_cpu_x86_avx512_core_amx() {
#endif // OPENVINO_ARCH_X86 || OPENVINO_ARCH_X86_64
bool checkOpenMpEnvVars(bool includeOMPNumThreads) {
bool check_open_mp_env_vars(bool include_omp_num_threads) {
for (auto&& var : {"GOMP_CPU_AFFINITY",
"GOMP_DEBUG"
"GOMP_RTEMS_THREAD_POOLS",
@@ -134,7 +134,7 @@ bool checkOpenMpEnvVars(bool includeOMPNumThreads) {
"PHI_KMP_PLACE_THREADS"
"PHI_OMP_NUM_THREADS"}) {
if (getenv(var)) {
if (0 != strcmp(var, "OMP_NUM_THREADS") || includeOMPNumThreads)
if (0 != strcmp(var, "OMP_NUM_THREADS") || include_omp_num_threads)
return true;
}
}
@@ -144,19 +144,19 @@ bool checkOpenMpEnvVars(bool includeOMPNumThreads) {
#if defined(__APPLE__) || defined(__EMSCRIPTEN__)
// for Linux and Windows the getNumberOfCPUCores (that accounts only for physical cores) implementation is OS-specific
// (see cpp files in corresponding folders), for __APPLE__ it is default :
int getNumberOfCPUCores(bool) {
int get_number_of_cpu_cores(bool) {
return parallel_get_max_threads();
}
# if !((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
std::vector<int> getAvailableNUMANodes() {
std::vector<int> get_available_numa_nodes() {
return {-1};
}
# endif
int getNumberOfLogicalCPUCores(bool) {
int get_number_of_logical_cpu_cores(bool) {
return parallel_get_max_threads();
}
#else
int getNumberOfLogicalCPUCores(bool bigCoresOnly) {
int get_number_of_logical_cpu_cores(bool bigCoresOnly) {
int logical_cores = parallel_get_max_threads();
# if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
auto core_types = custom::info::core_types();
@@ -170,18 +170,18 @@ int getNumberOfLogicalCPUCores(bool bigCoresOnly) {
#endif
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
std::vector<int> getAvailableNUMANodes() {
std::vector<int> get_available_numa_nodes() {
return custom::info::numa_nodes();
}
// this is impl only with the TBB
std::vector<int> getAvailableCoresTypes() {
std::vector<int> get_available_cores_types() {
return custom::info::core_types();
}
#else
// as the core types support exists only with the TBB, the fallback is same for any other threading API
std::vector<int> getAvailableCoresTypes() {
std::vector<int> get_available_cores_types() {
return {-1};
}
#endif
} // namespace InferenceEngine
} // namespace ov

View File

@@ -194,7 +194,7 @@ struct CPUStreamsExecutor::Impl {
}
#elif IE_THREAD == IE_THREAD_SEQ
if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
PinCurrentThreadToSocket(_numaNodeId);
InferenceEngine::PinCurrentThreadToSocket(_numaNodeId);
} else if (ThreadBindingType::CORES == _impl->_config._threadBindingType) {
CpuSet processMask;
int ncpus = 0;
@@ -368,7 +368,7 @@ int CPUStreamsExecutor::GetNumaNodeId() {
return stream->_numaNodeId;
}
CPUStreamsExecutor::CPUStreamsExecutor(const IStreamsExecutor::Config& config) : _impl{new Impl{config}} {}
CPUStreamsExecutor::CPUStreamsExecutor(const Config& config) : _impl{new Impl{config}} {}
CPUStreamsExecutor::~CPUStreamsExecutor() {
{

View File

@@ -7,7 +7,10 @@
#include "ie_parallel.hpp"
#include "openvino/runtime/properties.hpp"
#include "openvino/runtime/threading/executor_manager.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
#include "threading/ie_cpu_streams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
# if (TBB_INTERFACE_VERSION < 12000)
# include <tbb/task_scheduler_init.h>
@@ -25,7 +28,7 @@ namespace InferenceEngine {
namespace {
class ExecutorManagerImpl : public ExecutorManager {
public:
ExecutorManagerImpl(const std::shared_ptr<ov::ExecutorManager>& manager);
ExecutorManagerImpl(const std::shared_ptr<ov::threading::ExecutorManager>& manager);
ITaskExecutor::Ptr getExecutor(const std::string& id) override;
IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) override;
size_t getExecutorsNumber() const override;
@@ -35,15 +38,55 @@ public:
bool getTbbFlag() override;
private:
std::shared_ptr<ov::ExecutorManager> m_manager;
std::shared_ptr<ov::ExecutorManager> get_ov_manager() const override {
std::shared_ptr<ov::threading::ExecutorManager> m_manager;
std::shared_ptr<ov::threading::ExecutorManager> get_ov_manager() const override {
return m_manager;
}
};
class TaskExecutorWrapper : public ITaskExecutor {
std::shared_ptr<ov::threading::ITaskExecutor> m_executor;
public:
TaskExecutorWrapper(const std::shared_ptr<ov::threading::ITaskExecutor>& executor) : m_executor(executor) {}
void run(Task task) override {
m_executor->run(task);
}
void runAndWait(const std::vector<Task>& tasks) override {
m_executor->run_and_wait(tasks);
}
};
class StreamsExecutorWrapper : public IStreamsExecutor {
std::shared_ptr<ov::threading::IStreamsExecutor> m_executor;
public:
StreamsExecutorWrapper(const std::shared_ptr<ov::threading::IStreamsExecutor>& executor) : m_executor(executor) {}
void run(Task task) override {
m_executor->run(task);
}
void runAndWait(const std::vector<Task>& tasks) override {
m_executor->run_and_wait(tasks);
}
int GetStreamId() override {
return m_executor->get_stream_id();
}
int GetNumaNodeId() override {
return m_executor->get_numa_node_id();
}
void Execute(Task task) override {
m_executor->execute(task);
}
};
} // namespace
ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr<ov::ExecutorManager>& manager) : m_manager(manager) {}
ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr<ov::threading::ExecutorManager>& manager)
: m_manager(manager) {}
void ExecutorManagerImpl::setTbbFlag(bool flag) {
m_manager->set_property({{ov::force_tbb_terminate.name(), flag}});
@@ -54,11 +97,11 @@ bool ExecutorManagerImpl::getTbbFlag() {
}
ITaskExecutor::Ptr ExecutorManagerImpl::getExecutor(const std::string& id) {
return m_manager->get_executor(id);
return std::make_shared<TaskExecutorWrapper>(m_manager->get_executor(id));
}
IStreamsExecutor::Ptr ExecutorManagerImpl::getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) {
return m_manager->get_idle_cpu_streams_executor(config);
return std::make_shared<StreamsExecutorWrapper>(m_manager->get_idle_cpu_streams_executor(config));
}
size_t ExecutorManagerImpl::getExecutorsNumber() const {
@@ -74,7 +117,7 @@ void ExecutorManagerImpl::clear(const std::string& id) {
}
std::shared_ptr<InferenceEngine::ExecutorManager> create_old_manager(
const std::shared_ptr<ov::ExecutorManager>& manager) {
const std::shared_ptr<ov::threading::ExecutorManager>& manager) {
return std::make_shared<ExecutorManagerImpl>(manager);
}
@@ -94,7 +137,7 @@ public:
std::lock_guard<std::mutex> lock(_mutex);
auto manager = _manager.lock();
if (!manager) {
_manager = manager = create_old_manager(ov::executor_manager());
_manager = manager = create_old_manager(ov::threading::executor_manager());
}
return manager;
}

View File

@@ -23,463 +23,31 @@ namespace InferenceEngine {
IStreamsExecutor::~IStreamsExecutor() {}
std::vector<std::string> IStreamsExecutor::Config::SupportedKeys() const {
return {
CONFIG_KEY(CPU_THROUGHPUT_STREAMS),
CONFIG_KEY(CPU_BIND_THREAD),
CONFIG_KEY(CPU_THREADS_NUM),
CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM),
CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS),
CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS),
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG),
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL),
CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET),
CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD),
ov::num_streams.name(),
ov::inference_num_threads.name(),
ov::affinity.name(),
};
return get_property(ov::supported_properties.name()).as<std::vector<std::string>>();
}
int IStreamsExecutor::Config::GetDefaultNumStreams(const bool enable_hyper_thread) {
const int sockets = static_cast<int>(getAvailableNUMANodes().size());
// bare minimum of streams (that evenly divides available number of core)
const int num_cores = sockets == 1 ? (enable_hyper_thread ? parallel_get_max_threads() : getNumberOfCPUCores())
: getNumberOfCPUCores();
if (0 == num_cores % 4)
return std::max(4, num_cores / 4);
else if (0 == num_cores % 5)
return std::max(5, num_cores / 5);
else if (0 == num_cores % 3)
return std::max(3, num_cores / 3);
else // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide
return 1;
return get_default_num_streams(enable_hyper_thread);
}
int IStreamsExecutor::Config::GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode) {
const int num_cores = parallel_get_max_threads();
const int num_cores_phy = getNumberOfCPUCores();
const int num_big_cores_phy = getNumberOfCPUCores(true);
const int num_small_cores = num_cores_phy - num_big_cores_phy;
const int num_big_cores = num_cores > num_cores_phy ? num_big_cores_phy * 2 : num_big_cores_phy;
int big_core_streams = 0;
int small_core_streams = 0;
int threads_per_stream_big = 0;
int threads_per_stream_small = 0;
if (stream_mode == DEFAULT) {
// bare minimum of streams (that evenly divides available number of core)
if (0 == num_big_cores_phy % 4) {
threads_per_stream_big = 4;
} else if (0 == num_big_cores_phy % 5) {
threads_per_stream_big = 5;
} else if (0 == num_big_cores_phy % 3) {
threads_per_stream_big = 3;
} else { // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide
threads_per_stream_big = num_big_cores_phy;
}
big_core_streams = num_big_cores / threads_per_stream_big;
threads_per_stream_small = threads_per_stream_big;
if (num_small_cores == 0) {
threads_per_stream_small = 0;
} else if (num_small_cores < threads_per_stream_small) {
small_core_streams = 1;
threads_per_stream_small = num_small_cores;
threads_per_stream_big = threads_per_stream_small;
// Balance the computation of physical core and logical core, the number of threads on the physical core and
// logical core should be equal
big_core_streams = num_big_cores_phy / threads_per_stream_big * 2;
} else {
small_core_streams = num_small_cores / threads_per_stream_small;
}
} else if (stream_mode == AGGRESSIVE) {
big_core_streams = num_big_cores;
small_core_streams = num_small_cores;
threads_per_stream_big = num_big_cores / big_core_streams;
threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams;
} else if (stream_mode == LESSAGGRESSIVE) {
big_core_streams = num_big_cores / 2;
small_core_streams = num_small_cores / 2;
threads_per_stream_big = num_big_cores / big_core_streams;
threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / small_core_streams;
} else {
IE_THROW() << "Wrong stream mode to get num of streams: " << stream_mode;
}
config[CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)] = std::to_string(big_core_streams);
config[CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)] = std::to_string(small_core_streams);
config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)] = std::to_string(threads_per_stream_big);
config[CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)] = std::to_string(threads_per_stream_small);
// This is default setting for specific CPU which Pcore is in front and Ecore is in the back.
config[CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)] = std::to_string(num_small_cores == 0 ? 0 : num_big_cores);
return big_core_streams + small_core_streams;
return get_hybrid_num_streams(config, stream_mode);
}
void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::string& value) {
if (key == CONFIG_KEY(CPU_BIND_THREAD)) {
if (value == CONFIG_VALUE(YES) || value == CONFIG_VALUE(NUMA)) {
#if (defined(__APPLE__) || defined(_WIN32))
_threadBindingType = IStreamsExecutor::ThreadBindingType::NUMA;
#else
_threadBindingType = (value == CONFIG_VALUE(YES)) ? IStreamsExecutor::ThreadBindingType::CORES
: IStreamsExecutor::ThreadBindingType::NUMA;
#endif
} else if (value == CONFIG_VALUE(HYBRID_AWARE)) {
_threadBindingType = IStreamsExecutor::ThreadBindingType::HYBRID_AWARE;
} else if (value == CONFIG_VALUE(NO)) {
_threadBindingType = IStreamsExecutor::ThreadBindingType::NONE;
} else {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_BIND_THREAD)
<< ". Expected only YES(binds to cores) / NO(no binding) / NUMA(binds to NUMA nodes) / "
"HYBRID_AWARE (let the runtime recognize and use the hybrid cores)";
}
} else if (key == ov::affinity) {
ov::Affinity affinity;
std::stringstream{value} >> affinity;
switch (affinity) {
case ov::Affinity::NONE:
_threadBindingType = ThreadBindingType::NONE;
break;
case ov::Affinity::CORE: {
#if (defined(__APPLE__) || defined(_WIN32))
_threadBindingType = ThreadBindingType::NUMA;
#else
_threadBindingType = ThreadBindingType::CORES;
#endif
} break;
case ov::Affinity::NUMA:
_threadBindingType = ThreadBindingType::NUMA;
break;
case ov::Affinity::HYBRID_AWARE:
_threadBindingType = ThreadBindingType::HYBRID_AWARE;
break;
default:
OPENVINO_UNREACHABLE("Unsupported affinity type");
}
} else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) {
if (value == CONFIG_VALUE(CPU_THROUGHPUT_NUMA)) {
_streams = static_cast<int>(getAvailableNUMANodes().size());
} else if (value == CONFIG_VALUE(CPU_THROUGHPUT_AUTO)) {
// bare minimum of streams (that evenly divides available number of cores)
_streams = GetDefaultNumStreams();
} else {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS)
<< ". Expected only positive numbers (#streams) or "
<< "PluginConfigParams::CPU_THROUGHPUT_NUMA/CPU_THROUGHPUT_AUTO";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THROUGHPUT_STREAMS)
<< ". Expected only positive numbers (#streams)";
}
_streams = val_i;
}
} else if (key == ov::num_streams) {
auto streams = ov::util::from_string(value, ov::streams::num);
if (streams == ov::streams::NUMA) {
_streams = static_cast<int32_t>(getAvailableNUMANodes().size());
} else if (streams == ov::streams::AUTO) {
// bare minimum of streams (that evenly divides available number of cores)
_streams = GetDefaultNumStreams();
} else if (streams.num >= 0) {
_streams = streams.num;
} else {
OPENVINO_UNREACHABLE("Wrong value for property key ",
ov::num_streams.name(),
". Expected non negative numbers (#streams) or ",
"ov::streams::NUMA|ov::streams::AUTO, Got: ",
streams);
}
} else if (key == CONFIG_KEY(CPU_THREADS_NUM) || key == ov::inference_num_threads) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM)
<< ". Expected only positive numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY(CPU_THREADS_NUM)
<< ". Expected only positive numbers (#threads)";
}
_threads = val_i;
} else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)
<< ". Expected only non negative numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for property key " << CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)
<< ". Expected only non negative numbers (#threads)";
}
_threadsPerStream = val_i;
} else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
_big_core_streams = val_i;
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)
<< ". Expected only non negative numbers (#streams)";
}
_small_core_streams = val_i;
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)
<< ". Expected only non negative numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)
<< ". Expected only non negative numbers (#threads)";
}
_threads_per_stream_big = val_i;
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)
<< ". Expected only non negative numbers (#threads)";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)
<< ". Expected only non negative numbers (#threads)";
}
_threads_per_stream_small = val_i;
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
int val_i;
try {
val_i = std::stoi(value);
} catch (const std::exception&) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)
<< ". Expected only non negative numbers";
}
if (val_i < 0) {
IE_THROW() << "Wrong value for HYBRID_AWARE key " << CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)
<< ". Expected only non negative numbers";
}
_small_core_offset = val_i;
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
if (value == CONFIG_VALUE(YES)) {
_enable_hyper_thread = true;
} else if (value == CONFIG_VALUE(NO)) {
_enable_hyper_thread = false;
} else {
OPENVINO_UNREACHABLE("Unsupported enable hyper thread type");
}
} else {
IE_THROW() << "Wrong value for property key " << key;
}
set_property(key, value);
}
Parameter IStreamsExecutor::Config::GetConfig(const std::string& key) const {
if (key == ov::affinity) {
switch (_threadBindingType) {
case IStreamsExecutor::ThreadBindingType::NONE:
return ov::Affinity::NONE;
case IStreamsExecutor::ThreadBindingType::CORES:
return ov::Affinity::CORE;
case IStreamsExecutor::ThreadBindingType::NUMA:
return ov::Affinity::NUMA;
case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE:
return ov::Affinity::HYBRID_AWARE;
}
} else if (key == CONFIG_KEY(CPU_BIND_THREAD)) {
switch (_threadBindingType) {
case IStreamsExecutor::ThreadBindingType::NONE:
return {CONFIG_VALUE(NO)};
case IStreamsExecutor::ThreadBindingType::CORES:
return {CONFIG_VALUE(YES)};
case IStreamsExecutor::ThreadBindingType::NUMA:
return {CONFIG_VALUE(NUMA)};
case IStreamsExecutor::ThreadBindingType::HYBRID_AWARE:
return {CONFIG_VALUE(HYBRID_AWARE)};
}
} else if (key == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) {
return {std::to_string(_streams)};
} else if (key == ov::num_streams) {
return decltype(ov::num_streams)::value_type{_streams};
} else if (key == CONFIG_KEY(CPU_THREADS_NUM)) {
return {std::to_string(_threads)};
} else if (key == ov::inference_num_threads) {
return decltype(ov::inference_num_threads)::value_type{_threads};
} else if (key == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) {
return {std::to_string(_threadsPerStream)};
} else if (key == CONFIG_KEY_INTERNAL(BIG_CORE_STREAMS)) {
return {std::to_string(_big_core_streams)};
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_STREAMS)) {
return {std::to_string(_small_core_streams)};
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG)) {
return {std::to_string(_threads_per_stream_big)};
} else if (key == CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL)) {
return {std::to_string(_threads_per_stream_small)};
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
return {std::to_string(_small_core_offset)};
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
return {_enable_hyper_thread ? CONFIG_VALUE(YES) : CONFIG_VALUE(NO)};
} else {
IE_THROW() << "Wrong value for property key " << key;
}
return {};
return get_property(key);
}
void IStreamsExecutor::Config::UpdateHybridCustomThreads(Config& config) {
const auto num_cores = parallel_get_max_threads();
const auto num_cores_phys = getNumberOfCPUCores();
const auto num_big_cores_phys = getNumberOfCPUCores(true);
const auto num_big_cores = num_cores > num_cores_phys ? num_big_cores_phys * 2 : num_big_cores_phys;
const auto num_small_cores_phys = num_cores_phys - num_big_cores_phys;
const auto threads = config._threads ? config._threads : num_cores;
const auto streams = config._streams > 0 ? config._streams : 1;
config._small_core_offset = num_big_cores;
int threads_per_stream = std::max(1, threads / streams);
if ((num_big_cores_phys / threads_per_stream >= streams) && (1 < threads_per_stream)) {
config._big_core_streams = streams;
config._threads_per_stream_big = threads_per_stream;
config._small_core_streams = 0;
config._threads_per_stream_small = 0;
} else if ((num_small_cores_phys / threads_per_stream >= streams) && (num_big_cores_phys < threads_per_stream)) {
config._big_core_streams = 0;
config._threads_per_stream_big = 0;
config._small_core_streams = streams;
config._threads_per_stream_small = threads_per_stream;
} else {
const int threads_per_stream_big = std::min(num_big_cores_phys, threads_per_stream);
const int threads_per_stream_small = std::min(num_small_cores_phys, threads_per_stream);
threads_per_stream = std::min(threads_per_stream_big, threads_per_stream_small);
while (threads_per_stream > 1) {
const int base_big_streams = num_big_cores_phys / threads_per_stream;
const int base_small_streams = num_small_cores_phys > 0 ? num_small_cores_phys / threads_per_stream : 0;
if (base_big_streams + base_small_streams >= streams) {
config._big_core_streams = base_big_streams;
config._small_core_streams = streams - base_big_streams;
break;
} else if (base_big_streams * 2 + base_small_streams >= streams) {
config._big_core_streams = streams - base_small_streams;
config._small_core_streams = base_small_streams;
break;
} else {
threads_per_stream = threads_per_stream > 1 ? threads_per_stream - 1 : 1;
}
}
if (threads_per_stream == 1) {
const int stream_loops = streams / num_cores;
const int remain_streams = streams - stream_loops * num_cores;
if (num_big_cores_phys >= remain_streams) {
config._big_core_streams = remain_streams + num_big_cores * stream_loops;
config._small_core_streams = num_small_cores_phys * stream_loops;
} else if (num_big_cores_phys + num_small_cores_phys >= remain_streams) {
config._big_core_streams = num_big_cores_phys + num_big_cores * stream_loops;
config._small_core_streams = remain_streams - num_big_cores_phys + num_small_cores_phys * stream_loops;
} else {
config._big_core_streams = remain_streams - num_small_cores_phys + num_big_cores * stream_loops;
config._small_core_streams = num_small_cores_phys * (stream_loops + 1);
}
}
config._threads_per_stream_big = threads_per_stream;
config._threads_per_stream_small = threads_per_stream;
}
return update_hybrid_custom_threads(config);
}
IStreamsExecutor::Config IStreamsExecutor::Config::MakeDefaultMultiThreaded(const IStreamsExecutor::Config& initial,
const bool fp_intesive) {
const auto envThreads = parallel_get_env_threads();
const auto& numaNodes = getAvailableNUMANodes();
const int numaNodesNum = static_cast<int>(numaNodes.size());
auto streamExecutorConfig = initial;
const bool bLatencyCase = streamExecutorConfig._streams <= numaNodesNum;
// by default, do not use the hyper-threading (to minimize threads synch overheads)
int num_cores_default = getNumberOfCPUCores();
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
// additional latency-case logic for hybrid processors:
if (ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType) {
const auto core_types = custom::info::core_types();
const auto num_little_cores =
custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.front()));
const auto num_big_cores_phys = getNumberOfCPUCores(true);
const int int8_threshold = 4; // ~relative efficiency of the VNNI-intensive code for Big vs Little cores;
const int fp32_threshold = 2; // ~relative efficiency of the AVX2 fp32 code for Big vs Little cores;
// by default the latency case uses (faster) Big cores only, depending on the compute ratio
const bool bLatencyCaseBigOnly =
num_big_cores_phys > (num_little_cores / (fp_intesive ? fp32_threshold : int8_threshold));
// selecting the preferred core type
streamExecutorConfig._threadPreferredCoreType =
bLatencyCase ? (bLatencyCaseBigOnly ? IStreamsExecutor::Config::PreferredCoreType::BIG
: IStreamsExecutor::Config::PreferredCoreType::ANY)
: IStreamsExecutor::Config::PreferredCoreType::ROUND_ROBIN;
// additionally selecting the #cores to use in the "Big-only" case
if (bLatencyCaseBigOnly) {
const int hyper_threading_threshold =
2; // min #cores, for which the hyper-threading becomes useful for the latency case
const auto num_big_cores =
custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(core_types.back()));
num_cores_default = (num_big_cores_phys <= hyper_threading_threshold) ? num_big_cores : num_big_cores_phys;
}
// if nstreams or nthreads are set, need to calculate the Hybrid aware parameters here
if (!bLatencyCase && (streamExecutorConfig._big_core_streams == 0 || streamExecutorConfig._threads)) {
UpdateHybridCustomThreads(streamExecutorConfig);
}
OPENVINO_DEBUG << "[ p_e_core_info ] streams (threads): " << streamExecutorConfig._streams << "("
<< streamExecutorConfig._threads_per_stream_big * streamExecutorConfig._big_core_streams +
streamExecutorConfig._threads_per_stream_small * streamExecutorConfig._small_core_streams
<< ") -- PCore: " << streamExecutorConfig._big_core_streams << "("
<< streamExecutorConfig._threads_per_stream_big
<< ") ECore: " << streamExecutorConfig._small_core_streams << "("
<< streamExecutorConfig._threads_per_stream_small << ")";
}
#endif
const auto hwCores =
!bLatencyCase && numaNodesNum == 1
// throughput case on a single-NUMA node machine uses all available cores
? (streamExecutorConfig._enable_hyper_thread ? parallel_get_max_threads() : num_cores_default)
// in the rest of cases:
// multi-node machine
// or
// latency case, single-node yet hybrid case that uses
// all core types
// or
// big-cores only, but the #cores is "enough" (pls see the logic above)
// it is usually beneficial not to use the hyper-threading (which is default)
: num_cores_default;
const auto threads =
streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores);
streamExecutorConfig._threadsPerStream =
streamExecutorConfig._streams ? std::max(1, threads / streamExecutorConfig._streams) : threads;
streamExecutorConfig._threads =
(!bLatencyCase && ThreadBindingType::HYBRID_AWARE == streamExecutorConfig._threadBindingType)
? streamExecutorConfig._big_core_streams * streamExecutorConfig._threads_per_stream_big +
streamExecutorConfig._small_core_streams * streamExecutorConfig._threads_per_stream_small
: streamExecutorConfig._threadsPerStream * streamExecutorConfig._streams;
return streamExecutorConfig;
return make_default_multi_threaded(initial);
}
} // namespace InferenceEngine

View File

@@ -12,27 +12,7 @@
namespace InferenceEngine {
void ITaskExecutor::runAndWait(const std::vector<Task>& tasks) {
std::vector<std::packaged_task<void()>> packagedTasks;
std::vector<std::future<void>> futures;
for (std::size_t i = 0; i < tasks.size(); ++i) {
packagedTasks.emplace_back([&tasks, i] {
tasks[i]();
});
futures.emplace_back(packagedTasks.back().get_future());
}
for (std::size_t i = 0; i < tasks.size(); ++i) {
run([&packagedTasks, i] {
packagedTasks[i]();
});
}
// std::future::get will rethrow exception from task.
// We should wait all tasks before any exception is thrown.
// So wait() and get() for each future moved to separate loops
for (auto&& future : futures) {
future.wait();
}
for (auto&& future : futures) {
future.get();
}
run_and_wait(tasks);
}
} // namespace InferenceEngine

View File

@@ -10,7 +10,7 @@
#include "streams_executor.hpp"
using namespace testing;
using namespace InferenceEngine;
using namespace ov;
namespace {
@@ -36,12 +36,12 @@ public:
std::vector<std::vector<int>> test_proc_type_table;
std::vector<std::vector<int>> test_cpu_mapping_table;
InferenceEngine::parse_processor_info_linux(test_data._processors,
test_data.system_info_table,
test_sockets,
test_cores,
test_proc_type_table,
test_cpu_mapping_table);
ov::parse_processor_info_linux(test_data._processors,
test_data.system_info_table,
test_sockets,
test_cores,
test_proc_type_table,
test_cpu_mapping_table);
ASSERT_EQ(test_data._sockets, test_sockets);
ASSERT_EQ(test_data._cores, test_cores);
@@ -629,13 +629,13 @@ public:
std::vector<std::vector<int>> test_proc_type_table;
std::vector<std::vector<int>> test_cpu_mapping_table;
parse_processor_info_win(test_info_ptr,
len,
test_data._processors,
test_sockets,
test_cores,
test_proc_type_table,
test_cpu_mapping_table);
ov::parse_processor_info_win(test_info_ptr,
len,
test_data._processors,
test_sockets,
test_cores,
test_proc_type_table,
test_cpu_mapping_table);
ASSERT_EQ(test_data._sockets, test_sockets);
ASSERT_EQ(test_data._cores, test_cores);

View File

@@ -4,36 +4,34 @@
#include <gtest/gtest.h>
#include <threading/ie_executor_manager.hpp>
#include "openvino/runtime/threading/executor_manager.hpp"
using namespace ::testing;
using namespace std;
using namespace InferenceEngine;
TEST(ExecutorManagerTests, canCreateSingleExecutorManager) {
auto executorManager1 = executorManager();
auto executorManager1 = ov::threading::executor_manager();
auto executorManager2 = executorManager();
auto executorManager2 = ov::threading::executor_manager();
ASSERT_EQ(executorManager1, executorManager2);
}
TEST(ExecutorManagerTests, createDifferentExecutorsForDifferentDevices) {
auto executorMgr = executorManager();
auto executor1 = executorMgr->getExecutor("CPU");
auto executor2 = executorMgr->getExecutor("GPU");
auto executorMgr = ov::threading::executor_manager();
auto executor1 = executorMgr->get_executor("CPU");
auto executor2 = executorMgr->get_executor("GPU");
ASSERT_NE(executor1, executor2);
ASSERT_EQ(2, executorMgr->getExecutorsNumber());
ASSERT_EQ(2, executorMgr->get_executors_number());
}
TEST(ExecutorManagerTests, returnTheSameExecutorForTheSameDevice) {
auto executorMgr = executorManager();
auto executor1 = executorMgr->getExecutor("CPU");
auto executor2 = executorMgr->getExecutor("GPU");
auto executorMgr = ov::threading::executor_manager();
auto executor1 = executorMgr->get_executor("CPU");
auto executor2 = executorMgr->get_executor("GPU");
auto executor = executorMgr->getExecutor("GPU");
auto executor = executorMgr->get_executor("GPU");
ASSERT_EQ(executor, executor2);
ASSERT_EQ(2, executorMgr->getExecutorsNumber());
ASSERT_EQ(2, executorMgr->get_executors_number());
}

View File

@@ -9,10 +9,11 @@
#include "template_itt.hpp"
// ! [async_infer_request:ctor]
TemplatePlugin::AsyncInferRequest::AsyncInferRequest(const std::shared_ptr<TemplatePlugin::InferRequest>& request,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const InferenceEngine::ITaskExecutor::Ptr& wait_executor,
const InferenceEngine::ITaskExecutor::Ptr& callback_executor)
TemplatePlugin::AsyncInferRequest::AsyncInferRequest(
const std::shared_ptr<TemplatePlugin::InferRequest>& request,
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& wait_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& callback_executor)
: ov::IAsyncInferRequest(request, task_executor, callback_executor),
m_wait_executor(wait_executor) {
// In current implementation we have CPU only tasks and no needs in 2 executors

View File

@@ -16,14 +16,14 @@ namespace TemplatePlugin {
class AsyncInferRequest : public ov::IAsyncInferRequest {
public:
AsyncInferRequest(const std::shared_ptr<InferRequest>& request,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const InferenceEngine::ITaskExecutor::Ptr& wait_executor,
const InferenceEngine::ITaskExecutor::Ptr& callback_executor);
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& wait_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& callback_executor);
~AsyncInferRequest();
private:
InferenceEngine::ITaskExecutor::Ptr m_wait_executor;
std::shared_ptr<ov::threading::ITaskExecutor> m_wait_executor;
};
// ! [async_infer_request:header]

View File

@@ -79,7 +79,7 @@ void fill_output_info(const ov::Output<ov::Node>& output, InferenceEngine::DataP
// ! [executable_network:ctor_cnnnetwork]
TemplatePlugin::CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
const std::shared_ptr<const ov::IPlugin>& plugin,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const Configuration& cfg)
: ov::ICompiledModel(model, plugin, task_executor), // Disable default threads creation
_cfg(cfg),

View File

@@ -24,7 +24,7 @@ class CompiledModel : public ov::ICompiledModel {
public:
CompiledModel(const std::shared_ptr<ov::Model>& model,
const std::shared_ptr<const ov::IPlugin>& plugin,
const InferenceEngine::ITaskExecutor::Ptr& task_executor,
const std::shared_ptr<ov::threading::ITaskExecutor>& task_executor,
const Configuration& cfg);
// Methods from a base class ov::ICompiledModel

View File

@@ -91,7 +91,7 @@ std::shared_ptr<ov::ICompiledModel> TemplatePlugin::Plugin::compile_model(const
auto fullConfig = Configuration{properties, _cfg};
auto streamsExecutorConfig =
InferenceEngine::IStreamsExecutor::Config::MakeDefaultMultiThreaded(fullConfig._streamsExecutorConfig);
ov::threading::IStreamsExecutor::Config::make_default_multi_threaded(fullConfig._streamsExecutorConfig);
streamsExecutorConfig._name = stream_executor_name;
auto compiled_model =
std::make_shared<CompiledModel>(model->clone(),

View File

@@ -8,6 +8,7 @@
#include "compiled_model.hpp"
#include "openvino/runtime/icompiled_model.hpp"
#include "openvino/runtime/iplugin.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
#include "template_config.hpp"
//! [plugin:header]
@@ -50,7 +51,7 @@ private:
std::shared_ptr<ngraph::runtime::Backend> _backend;
Configuration _cfg;
InferenceEngine::ITaskExecutor::Ptr _waitExecutor;
std::shared_ptr<ov::threading::ITaskExecutor> _waitExecutor;
};
} // namespace TemplatePlugin

View File

@@ -16,16 +16,17 @@ Configuration::Configuration() {}
Configuration::Configuration(const ConfigMap& config, const Configuration& defaultCfg, bool throwOnUnsupported) {
*this = defaultCfg;
// If plugin needs to use InferenceEngine::StreamsExecutor it should be able to process its configuration
auto streamExecutorConfigKeys = _streamsExecutorConfig.SupportedKeys();
auto streamExecutorConfigKeys =
_streamsExecutorConfig.get_property(ov::supported_properties.name()).as<std::vector<std::string>>();
for (auto&& c : config) {
const auto& key = c.first;
const auto& value = c.second;
if (ov::template_plugin::throughput_streams == key) {
_streamsExecutorConfig.SetConfig(CONFIG_KEY(CPU_THROUGHPUT_STREAMS), value.as<std::string>());
_streamsExecutorConfig.set_property(CONFIG_KEY(CPU_THROUGHPUT_STREAMS), value);
} else if (streamExecutorConfigKeys.end() !=
std::find(std::begin(streamExecutorConfigKeys), std::end(streamExecutorConfigKeys), key)) {
_streamsExecutorConfig.SetConfig(key, value.as<std::string>());
_streamsExecutorConfig.set_property(key, value);
} else if (CONFIG_KEY(DEVICE_ID) == key) {
deviceId = std::stoi(value.as<std::string>());
if (deviceId > 0) {
@@ -42,11 +43,12 @@ Configuration::Configuration(const ConfigMap& config, const Configuration& defau
}
}
InferenceEngine::Parameter Configuration::Get(const std::string& name) const {
auto streamExecutorConfigKeys = _streamsExecutorConfig.SupportedKeys();
ov::Any Configuration::Get(const std::string& name) const {
auto streamExecutorConfigKeys =
_streamsExecutorConfig.get_property(ov::supported_properties.name()).as<std::vector<std::string>>();
if ((streamExecutorConfigKeys.end() !=
std::find(std::begin(streamExecutorConfigKeys), std::end(streamExecutorConfigKeys), name))) {
return _streamsExecutorConfig.GetConfig(name);
return _streamsExecutorConfig.get_property(name);
} else if (name == CONFIG_KEY(DEVICE_ID)) {
return {std::to_string(deviceId)};
} else if (name == CONFIG_KEY(PERF_COUNT)) {
@@ -54,7 +56,7 @@ InferenceEngine::Parameter Configuration::Get(const std::string& name) const {
} else if (name == ov::template_plugin::throughput_streams || name == CONFIG_KEY(CPU_THROUGHPUT_STREAMS)) {
return {std::to_string(_streamsExecutorConfig._streams)};
} else if (name == CONFIG_KEY(CPU_BIND_THREAD)) {
return const_cast<InferenceEngine::IStreamsExecutor::Config&>(_streamsExecutorConfig).GetConfig(name);
return _streamsExecutorConfig.get_property(name);
} else if (name == CONFIG_KEY(CPU_THREADS_NUM)) {
return {std::to_string(_streamsExecutorConfig._threads)};
} else if (name == CONFIG_KEY_INTERNAL(CPU_THREADS_PER_STREAM)) {

View File

@@ -4,11 +4,11 @@
#pragma once
#include <ie_parameter.hpp>
#include <map>
#include <openvino/runtime/properties.hpp>
#include <string>
#include <threading/ie_istreams_executor.hpp>
#include "openvino/runtime/properties.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
namespace TemplatePlugin {
@@ -26,13 +26,13 @@ struct Configuration {
const Configuration& defaultCfg = {},
const bool throwOnUnsupported = true);
InferenceEngine::Parameter Get(const std::string& name) const;
ov::Any Get(const std::string& name) const;
// Plugin configuration parameters
int deviceId = 0;
bool perfCount = true;
InferenceEngine::IStreamsExecutor::Config _streamsExecutorConfig;
ov::threading::IStreamsExecutor::Config _streamsExecutorConfig;
ov::hint::PerformanceMode performance_mode = ov::hint::PerformanceMode::UNDEFINED;
};
// ! [configuration:header]