diff --git a/src/inference/dev_api/cpp_interfaces/impl/ie_executable_network_thread_safe_default.hpp b/src/inference/dev_api/cpp_interfaces/impl/ie_executable_network_thread_safe_default.hpp index 00de2c26128..ec92b9d3c24 100644 --- a/src/inference/dev_api/cpp_interfaces/impl/ie_executable_network_thread_safe_default.hpp +++ b/src/inference/dev_api/cpp_interfaces/impl/ie_executable_network_thread_safe_default.hpp @@ -12,9 +12,10 @@ #include "cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp" #include "cpp_interfaces/interface/ie_iexecutable_network_internal.hpp" #include "cpp_interfaces/interface/ie_iplugin_internal.hpp" -#include "threading/ie_cpu_streams_executor.hpp" +#include "openvino/runtime/threading/cpu_streams_executor.hpp" -namespace InferenceEngine { +namespace ov { +namespace threading { /** * @brief This class provides optimal thread safe default implementation. @@ -77,4 +78,5 @@ protected: ITaskExecutor::Ptr _callbackExecutor = nullptr; //!< Holds a callback executor }; -} // namespace InferenceEngine +} // namespace threading +} // namespace ov diff --git a/src/inference/dev_api/cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp b/src/inference/dev_api/cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp index abeb6b79902..5378dcfe525 100644 --- a/src/inference/dev_api/cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp +++ b/src/inference/dev_api/cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp @@ -16,11 +16,14 @@ #include "cpp_interfaces/interface/ie_iinfer_request_internal.hpp" #include "ie_api.h" -#include "threading/ie_immediate_executor.hpp" -#include "threading/ie_istreams_executor.hpp" -#include "threading/ie_itask_executor.hpp" +#include "openvino/runtime/threading/immediate_executor.hpp" +#include "openvino/runtime/threading/istreams_executor.hpp" +#include "openvino/runtime/threading/itask_executor.hpp" -namespace InferenceEngine { +using namespace InferenceEngine; + +namespace ov { +namespace threading { IE_SUPPRESS_DEPRECATED_START /** @@ -59,11 +62,11 @@ class INFERENCE_ENGINE_1_0_DEPRECATED AsyncInferRequestThreadSafeDefault : publi Callback _callback; }; - struct ImmediateStreamsExecutor : public InferenceEngine::ITaskExecutor { + struct ImmediateStreamsExecutor : public ov::threading::ITaskExecutor { explicit ImmediateStreamsExecutor(const IStreamsExecutor::Ptr& streamsExecutor) : _streamsExecutor{streamsExecutor} {} - void run(InferenceEngine::Task task) override { - _streamsExecutor->Execute(std::move(task)); + void run(ov::threading::Task task) override { + _streamsExecutor->execute(std::move(task)); } IStreamsExecutor::Ptr _streamsExecutor; }; @@ -449,4 +452,5 @@ private: InferState _state = InferState::Idle; }; IE_SUPPRESS_DEPRECATED_END -} // namespace InferenceEngine +} // namespace threading +} // namespace ov diff --git a/src/inference/dev_api/cpp_interfaces/interface/ie_iplugin_internal.hpp b/src/inference/dev_api/cpp_interfaces/interface/ie_iplugin_internal.hpp index c25ea4f72be..859e56df154 100644 --- a/src/inference/dev_api/cpp_interfaces/interface/ie_iplugin_internal.hpp +++ b/src/inference/dev_api/cpp_interfaces/interface/ie_iplugin_internal.hpp @@ -23,6 +23,8 @@ #include "openvino/util/pp.hpp" #include "so_ptr.hpp" +using namespace ov::threading; + namespace InferenceEngine { class ExecutorManager; @@ -301,7 +303,7 @@ public: * @brief Gets reference to tasks execution manager * @return Reference to ExecutorManager interface */ - const std::shared_ptr& executorManager() const; + const std::shared_ptr& executorManager() const; /** * @brief Queries a plugin about supported layers in network @@ -368,11 +370,11 @@ protected: void SetExeNetworkInfo(const std::shared_ptr& exeNetwork, const std::shared_ptr& function); - std::string _pluginName; //!< A device name that plugins enables - std::map _config; //!< A map config keys -> values - std::weak_ptr _core; //!< A pointer to ICore interface - std::shared_ptr _executorManager; //!< A tasks execution manager - bool _isNewAPI; //!< A flag which shows used API + std::string _pluginName; //!< A device name that plugins enables + std::map _config; //!< A map config keys -> values + std::weak_ptr _core; //!< A pointer to ICore interface + std::shared_ptr _executorManager; //!< A tasks execution manager + bool _isNewAPI; //!< A flag which shows used API }; /** diff --git a/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp b/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp deleted file mode 100644 index b86145c70a2..00000000000 --- a/src/inference/dev_api/threading/ie_cpu_streams_executor.hpp +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @file ie_cpu_streams_executor.hpp - * @brief A header file for Inference Engine CPU-Streams-based Executor implementation. - */ - -#pragma once - -#include -#include - -#include "threading/ie_istreams_executor.hpp" - -namespace InferenceEngine { -/** - * @class CPUStreamsExecutor - * @ingroup ie_dev_api_threading - * @brief CPU Streams executor implementation. The executor splits the CPU into groups of threads, - * that can be pinned to cores or NUMA nodes. - * It uses custom threads to pull tasks from single queue. - */ -class INFERENCE_ENGINE_API_CLASS(CPUStreamsExecutor) : public IStreamsExecutor { -public: - /** - * @brief A shared pointer to a CPUStreamsExecutor object - */ - using Ptr = std::shared_ptr; - - /** - * @brief Constructor - * @param config Stream executor parameters - */ - explicit CPUStreamsExecutor(const IStreamsExecutor::Config& config = {}); - - /** - * @brief A class destructor - */ - ~CPUStreamsExecutor() override; - - void run(Task task) override; - - void Execute(Task task) override; - - int GetStreamId() override; - - int GetNumaNodeId() override; - - int GetSocketId() override; - -private: - struct Impl; - std::unique_ptr _impl; -}; - -} // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_executor_manager.hpp b/src/inference/dev_api/threading/ie_executor_manager.hpp deleted file mode 100644 index ef789c82c48..00000000000 --- a/src/inference/dev_api/threading/ie_executor_manager.hpp +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @file ie_executor_manager.hpp - * @brief A header file for Executor Manager - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include "threading/ie_istreams_executor.hpp" -#include "threading/ie_itask_executor.hpp" - -namespace ov { -namespace threading { - -class ExecutorManager; - -} -} // namespace ov - -namespace InferenceEngine { - -class IPluginWrapper; - -/** - * @interface ExecutorManager - * @brief Interface for tasks execution manager. - * This is global point for getting task executor objects by string id. - * It's necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. - * E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads - * to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor. - * @ingroup ie_dev_api_threading - */ -class INFERENCE_ENGINE_API_CLASS(ExecutorManager) { -public: - /** - * A shared pointer to ExecutorManager interface - */ - using Ptr = std::shared_ptr; - - /** - * @brief Returns executor by unique identificator - * @param id An unique identificator of device (Usually string representation of TargetDevice) - * @return A shared pointer to existing or newly ITaskExecutor - */ - virtual ITaskExecutor::Ptr getExecutor(const std::string& id) = 0; - - /// @private - virtual IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) = 0; - - /** - * @cond - */ - virtual size_t getExecutorsNumber() const = 0; - - virtual size_t getIdleCPUStreamsExecutorsNumber() const = 0; - - virtual void clear(const std::string& id = {}) = 0; - /** - * @endcond - */ - - virtual ~ExecutorManager() = default; - - /** - * @brief Returns a global instance of ExecutorManager - * @return The instance. - */ - INFERENCE_ENGINE_DEPRECATED("Use IInferencePlugin::executorManager() instead") - static ExecutorManager* getInstance(); - - /** - * @brief Set TBB terminate flag - * @param flag A boolean value: - * True to terminate tbb during destruction - * False to not terminate tbb during destruction - * @return void - */ - virtual void setTbbFlag(bool flag) = 0; - virtual bool getTbbFlag() = 0; - -private: - virtual std::shared_ptr get_ov_manager() const = 0; - friend class IPluginWrapper; -}; - -INFERENCE_ENGINE_API_CPP(ExecutorManager::Ptr) executorManager(); - -std::shared_ptr create_old_manager( - const std::shared_ptr& manager); - -} // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_immediate_executor.hpp b/src/inference/dev_api/threading/ie_immediate_executor.hpp deleted file mode 100644 index be40006574c..00000000000 --- a/src/inference/dev_api/threading/ie_immediate_executor.hpp +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @file ie_immediate_executor.hpp - * @brief A header file for Inference Engine Immediate Executor implementation - */ - -#pragma once - -#include -#include - -#include "openvino/runtime/threading/immediate_executor.hpp" -#include "threading/ie_itask_executor.hpp" - -namespace InferenceEngine { - -/** - * @brief Task executor implementation that just run tasks in current thread during calling of run() method - * @ingroup ie_dev_api_threading - */ -class ImmediateExecutor : public ITaskExecutor { -public: - /** - * @brief A shared pointer to a ImmediateExecutor object - */ - using Ptr = std::shared_ptr; - - /** - * @brief Destroys the object. - */ - ~ImmediateExecutor() override = default; - - void run(Task task) override { - task(); - } -}; - -} // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_istreams_executor.hpp b/src/inference/dev_api/threading/ie_istreams_executor.hpp deleted file mode 100644 index 55593583960..00000000000 --- a/src/inference/dev_api/threading/ie_istreams_executor.hpp +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @file ie_istreams_executor.hpp - * @brief A header file for Inference Engine Streams-based Executor Interface - */ - -#pragma once - -#include -#include -#include - -#include "ie_parameter.hpp" -#include "openvino/runtime/threading/istreams_executor.hpp" -#include "threading/ie_itask_executor.hpp" - -namespace InferenceEngine { - -/** - * @interface IStreamsExecutor - * @ingroup ie_dev_api_threading - * @brief Interface for Streams Task Executor. This executor groups worker threads into so-called `streams`. - * @par CPU - * The executor executes all parallel tasks using threads from one stream. - * With proper pinning settings it should reduce cache misses for memory bound workloads. - * @par NUMA - * On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream - */ -class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor, public ov::threading::IStreamsExecutor { -public: - /** - * A shared pointer to IStreamsExecutor interface - */ - using Ptr = std::shared_ptr; - - /** - * @brief Defines IStreamsExecutor configuration - */ - struct INFERENCE_ENGINE_API_CLASS(Config) : public ov::threading::IStreamsExecutor::Config { - /** - * @brief Supported Configuration keys - * @return vector of supported configuration keys - */ - std::vector SupportedKeys() const; - - /** - * @brief Parses configuration key/value pair - * @param key configuration key - * @param value configuration values - */ - void SetConfig(const std::string& key, const std::string& value); - - /** - * @brief Return configuration value - * @param key configuration key - * @return configuration value wrapped into Parameter - */ - Parameter GetConfig(const std::string& key) const; - - /** - * @brief Create appropriate multithreaded configuration - * filing unconfigured values from initial configuration using hardware properties - * @param initial Inital configuration - * @param fp_intesive additional hint for the the (Hybrid) core-types selection logic - * whether the executor should be configured for floating point intensive work (as opposite to int8 - * intensive) - * @return configured values - */ - static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true); - static int GetDefaultNumStreams( - const bool enable_hyper_thread = true); // no network specifics considered (only CPU's caps); - static int GetHybridNumStreams(std::map& config, const int stream_mode); - static void UpdateHybridCustomThreads(Config& config); - static Config ReserveCpuThreads(const Config& initial); - - /** - * @brief A constructor with arguments - * - * @param[in] name The executor name - * @param[in] streams @copybrief Config::_streams - * @param[in] threadsPerStream @copybrief Config::_threadsPerStream - * @param[in] threadBindingType @copybrief Config::_threadBindingType - * @param[in] threadBindingStep @copybrief Config::_threadBindingStep - * @param[in] threadBindingOffset @copybrief Config::_threadBindingOffset - * @param[in] threads @copybrief Config::_threads - * @param[in] threadPreferBigCores @copybrief Config::_threadPreferBigCores - */ - Config(std::string name = "StreamsExecutor", - int streams = 1, - int threadsPerStream = 0, - ThreadBindingType threadBindingType = ThreadBindingType::NONE, - int threadBindingStep = 1, - int threadBindingOffset = 0, - int threads = 0, - PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY, - std::vector> streamsInfoTable = {}, - bool cpuReservation = false) - : ov::threading::IStreamsExecutor::Config(name, - streams, - threadsPerStream, - threadBindingType, - threadBindingStep, - threadBindingOffset, - threads, - threadPreferredCoreType, - streamsInfoTable, - cpuReservation) {} - - Config(const ov::threading::IStreamsExecutor::Config& config) - : ov::threading::IStreamsExecutor::Config(config) {} - }; - - /** - * @brief A virtual destructor - */ - ~IStreamsExecutor() override; - - /** - * @brief Return the index of current stream - * @return An index of current stream. Or throw exceptions if called not from stream thread - */ - virtual int GetStreamId() = 0; - - /** - * @brief Return the id of current NUMA Node - * @return `ID` of current NUMA Node, or throws exceptions if called not from stream thread - */ - virtual int GetNumaNodeId() = 0; - - /** - * @brief Return the id of current socket - * @return `ID` of current socket, or throws exceptions if called not from stream thread - */ - virtual int GetSocketId() = 0; - - /** - * @brief Execute the task in the current thread using streams executor configuration and constraints - * @param task A task to start - */ - virtual void Execute(Task task) = 0; - - int get_stream_id() override { - return GetStreamId(); - } - - int get_numa_node_id() override { - return GetNumaNodeId(); - } - - int get_socket_id() override { - return GetSocketId(); - } - - void execute(Task task) override { - Execute(task); - } -}; - -} // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_itask_executor.hpp b/src/inference/dev_api/threading/ie_itask_executor.hpp deleted file mode 100644 index 1fc2923fca9..00000000000 --- a/src/inference/dev_api/threading/ie_itask_executor.hpp +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @file ie_itask_executor.hpp - * @brief A header file for Inference Engine Task Executor Interface - */ - -#pragma once - -#include -#include -#include - -#include "ie_api.h" -#include "openvino/runtime/threading/itask_executor.hpp" - -namespace InferenceEngine { - -/** - * @brief Inference Engine Task Executor can use any copyable callable without parameters and output as a task. - * It would be wrapped into std::function object - * @ingroup ie_dev_api_threading - */ -using Task = ov::threading::Task; - -/** -* @interface ITaskExecutor -* @ingroup ie_dev_api_threading -* @brief Interface for Task Executor. -* Inference Engine uses `InferenceEngine::ITaskExecutor` interface to run all asynchronous internal tasks. -* Different implementations of task executors can be used for different purposes: -* - To improve cache locality of memory bound CPU tasks some executors can limit task's affinity and maximum -concurrency. -* - The executor with one worker thread can be used to serialize access to acceleration device. -* - Immediate task executor can be used to satisfy `InferenceEngine::ITaskExecutor` interface restrictions but -run tasks in current thread. -* @note Implementation should guaranty thread safety of all methods -* It is `InferenceEngine::ITaskExecutor` user responsibility to wait for task execution completion. -* The `c++11` standard way to wait task completion is to use `std::packaged_task` or `std::promise` with -`std::future`. -* Here is an example of how to use `std::promise` to wait task completion and process task's exceptions: - * @snippet example_itask_executor.cpp itask_executor:define_pipeline - */ -class INFERENCE_ENGINE_API_CLASS(ITaskExecutor) : virtual public ov::threading::ITaskExecutor { -public: - /** - * A shared pointer to ITaskExecutor interface - */ - using Ptr = std::shared_ptr; - - /** - * @brief Destroys the object. - */ - virtual ~ITaskExecutor() = default; - - /** - * @brief Execute all of the tasks and waits for its completion. - * Default runAndWait() method implementation uses run() pure virtual method - * and higher level synchronization primitives from STL. - * The task is wrapped into std::packaged_task which returns std::future. - * std::packaged_task will call the task and signal to std::future that the task is finished - * or the exception is thrown from task - * Than std::future is used to wait for task execution completion and - * task exception extraction - * @note runAndWait() does not copy or capture tasks! - * @param tasks A vector of tasks to execute - */ - virtual void runAndWait(const std::vector& tasks); -}; - -} // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_tbb_streams_executor.hpp b/src/inference/dev_api/threading/ie_tbb_streams_executor.hpp deleted file mode 100644 index 2028990dd3e..00000000000 --- a/src/inference/dev_api/threading/ie_tbb_streams_executor.hpp +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#pragma once - -#include -#include - -#include "ie_api.h" -#include "ie_parallel.hpp" -#include "threading/ie_istreams_executor.hpp" - -#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -namespace InferenceEngine { -/** - * @class TBBStreamsExecutor - * @brief CPU Streams executor implementation. Use TBB thread pool to run tasks - */ -class INFERENCE_ENGINE_API_CLASS(TBBStreamsExecutor) : public IStreamsExecutor { -public: - using Ptr = std::shared_ptr; - explicit TBBStreamsExecutor(const Config& config = {}); - ~TBBStreamsExecutor() override; - void run(Task task) override; - void Execute(Task task) override; - int GetStreamId() override; - int GetNumaNodeId() override; - -private: - struct Impl; - std::unique_ptr _impl; -}; -} // namespace InferenceEngine -#endif diff --git a/src/inference/dev_api/threading/ie_thread_local.hpp b/src/inference/dev_api/threading/ie_thread_local.hpp deleted file mode 100644 index 40c7775d4cc..00000000000 --- a/src/inference/dev_api/threading/ie_thread_local.hpp +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#pragma once - -/** - * @brief A file containing thread local class implementation. - * @file ie_thread_local.hpp - */ - -#include "ie_parallel.hpp" - -#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO -# include -#else -# include -# include -# include -# include -# include -# include -#endif - -namespace InferenceEngine { - -#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO - -/** - * @brief A wrapper class to keep object to be thread local. - * @ingroup ie_dev_api_threading - * @tparam T A type of object to keep thread local. - */ -template -using ThreadLocal = tbb::enumerable_thread_specific; - -#else - -template -struct ThreadLocal { - using Map = std::unordered_map; - using Create = std::function; - Map _map; - mutable std::mutex _mutex; - Create _create; - - ThreadLocal() - : _create{[] { - return T{}; - }} {} - explicit ThreadLocal(const T& init) - : _create{[init] { - return init; - }} {} - ThreadLocal(ThreadLocal&& other) : _map{std::move(other._map)}, _create{std::move(other._create)} {} - ThreadLocal& operator=(ThreadLocal&& other) { - _map = std::move(other._map); - _create = std::move(other._create); - return *this; - } - ThreadLocal(const ThreadLocal&) = delete; - ThreadLocal& operator=(const ThreadLocal&&) = delete; - explicit ThreadLocal(const Create& create_) : _create{create_} {} - - T& local() { - auto threadId = std::this_thread::get_id(); - std::lock_guard lock{_mutex}; - auto itThreadLocal = _map.find(threadId); - if (itThreadLocal != _map.end()) { - return itThreadLocal->second; - } else { - return _map.emplace(threadId, _create()).first->second; - } - } - - auto size() const -> decltype(_map.size()) { - std::lock_guard lock{_mutex}; - return _map.size(); - } - - // WARNING: Thread Unsafe - template - struct Iterator { - It it; - bool operator!=(const Iterator& other) { - return it != other.it; - } - Iterator& operator++() { - ++it; - return *this; - } - auto operator*() -> decltype(it->second) { - return it->second; - } - auto operator-> () -> decltype(&(it->second)) { - return &(it->second); - } - auto operator*() const -> decltype(it->second) { - return it->second; - } - auto operator-> () const -> decltype(&(it->second)) { - return &(it->second); - } - }; - - auto begin() -> Iterator { - return {_map.begin()}; - } - auto end() -> Iterator { - return {_map.end()}; - } - auto begin() const -> Iterator const { - return {_map.begin()}; - } - auto end() const -> Iterator const { - return {_map.end()}; - } -}; - -#endif - -} // namespace InferenceEngine diff --git a/src/inference/dev_api/threading/ie_thread_safe_containers.hpp b/src/inference/dev_api/threading/ie_thread_safe_containers.hpp deleted file mode 100644 index b6ec0f2218f..00000000000 --- a/src/inference/dev_api/threading/ie_thread_safe_containers.hpp +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/////////////////////////////////////////////////////////////////////////////////////////////////// -#pragma once - -#include -#include -#include -#include -#include - -#include "ie_parallel.hpp" -#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -# include -# include -#endif - -namespace InferenceEngine { - -template -class ThreadSafeQueueWithSize { -public: - void push(T value) { - std::lock_guard lock(_mutex); - _queue.push(std::move(value)); - } - bool try_pop(T& value) { - std::lock_guard lock(_mutex); - if (!_queue.empty()) { - value = std::move(_queue.front()); - _queue.pop(); - return true; - } else { - return false; - } - } - size_t size() { - std::lock_guard lock(_mutex); - return _queue.size(); - } - -protected: - std::queue _queue; - std::mutex _mutex; -}; -#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -template -using ThreadSafeQueue = tbb::concurrent_queue; -template -using ThreadSafeBoundedQueue = tbb::concurrent_bounded_queue; -template -class ThreadSafeBoundedPriorityQueue { -public: - ThreadSafeBoundedPriorityQueue() = default; - bool try_push(T&& value) { - if (_capacity) { - _pqueue.push(std::move(value)); - return true; - } - return false; - } - bool try_pop(T& value) { - return _capacity ? _pqueue.try_pop(value) : false; - } - void set_capacity(std::size_t newCapacity) { - _capacity = newCapacity; - } - -protected: - tbb::concurrent_priority_queue> _pqueue; - std::atomic_bool _capacity{false}; -}; -#else -template -using ThreadSafeQueue = ThreadSafeQueueWithSize; -template -class ThreadSafeBoundedQueue { -public: - ThreadSafeBoundedQueue() = default; - bool try_push(T value) { - std::lock_guard lock(_mutex); - if (_capacity) { - _queue.push(std::move(value)); - } - return _capacity; - } - bool try_pop(T& value) { - std::lock_guard lock(_mutex); - if (_capacity && !_queue.empty()) { - value = std::move(_queue.front()); - _queue.pop(); - return true; - } else { - return false; - } - } - void set_capacity(std::size_t newCapacity) { - std::lock_guard lock(_mutex); - _capacity = newCapacity; - } - -protected: - std::queue _queue; - std::mutex _mutex; - bool _capacity = false; -}; -template -class ThreadSafeBoundedPriorityQueue { -public: - ThreadSafeBoundedPriorityQueue() = default; - bool try_push(T value) { - std::lock_guard lock(_mutex); - if (_capacity) { - _queue.push(std::move(value)); - } - return _capacity; - } - bool try_pop(T& value) { - std::lock_guard lock(_mutex); - if (_capacity && !_queue.empty()) { - value = std::move(_queue.top()); - _queue.pop(); - return true; - } else { - return false; - } - } - void set_capacity(std::size_t newCapacity) { - std::lock_guard lock(_mutex); - _capacity = newCapacity; - } - -protected: - std::priority_queue, std::greater> _queue; - std::mutex _mutex; - bool _capacity = false; -}; -#endif -} // namespace InferenceEngine diff --git a/src/inference/src/cpp_interfaces/interface/ie_iplugin_internal.cpp b/src/inference/src/cpp_interfaces/interface/ie_iplugin_internal.cpp index 32c3a9b43d8..a4fe56fb58a 100644 --- a/src/inference/src/cpp_interfaces/interface/ie_iplugin_internal.cpp +++ b/src/inference/src/cpp_interfaces/interface/ie_iplugin_internal.cpp @@ -37,7 +37,7 @@ #include "openvino/core/runtime_attribute.hpp" #include "openvino/op/util/op_types.hpp" #include "openvino/pass/manager.hpp" -#include "threading/ie_executor_manager.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" #include "transformations/utils/utils.hpp" namespace InferenceEngine { @@ -83,7 +83,7 @@ OutputsDataMap copyInfo(const OutputsDataMap& networkOutputs) { return _networkOutputs; } -IInferencePlugin::IInferencePlugin() : _executorManager(InferenceEngine::executorManager()), _isNewAPI(true) {} +IInferencePlugin::IInferencePlugin() : _executorManager(ov::threading::executor_manager()), _isNewAPI(true) {} void IInferencePlugin::VersionStore::copyFrom(const Version& v) { description = v.description; @@ -270,7 +270,7 @@ bool IInferencePlugin::IsNewAPI() const noexcept { return _isNewAPI; } -const std::shared_ptr& IInferencePlugin::executorManager() const { +const std::shared_ptr& IInferencePlugin::executorManager() const { return _executorManager; } diff --git a/src/inference/src/dev/converter_utils.cpp b/src/inference/src/dev/converter_utils.cpp index 2ea29d5c5bd..e5c4e4c3324 100644 --- a/src/inference/src/dev/converter_utils.cpp +++ b/src/inference/src/dev/converter_utils.cpp @@ -42,7 +42,6 @@ #include "openvino/runtime/threading/executor_manager.hpp" #include "openvino/runtime/variable_state.hpp" #include "remote_context_wrapper.hpp" -#include "threading/ie_executor_manager.hpp" #include "transformations/utils/utils.hpp" #ifdef PROXY_PLUGIN_ENABLED @@ -232,7 +231,7 @@ public: version.description = ver.description; SetVersion(version); _isNewAPI = plugin->is_new_api(); - _executorManager = InferenceEngine::create_old_manager(plugin->get_executor_manager()); + _executorManager = plugin->get_executor_manager(); } virtual ~IInferencePluginWrapper() = default; diff --git a/src/inference/src/dev/iplugin_wrapper.cpp b/src/inference/src/dev/iplugin_wrapper.cpp index 43a72fb38b3..5b25dcfb6aa 100644 --- a/src/inference/src/dev/iplugin_wrapper.cpp +++ b/src/inference/src/dev/iplugin_wrapper.cpp @@ -10,7 +10,7 @@ #include "dev/converter_utils.hpp" #include "ie_icore.hpp" #include "openvino/runtime/iremote_context.hpp" -#include "threading/ie_executor_manager.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" namespace InferenceEngine { @@ -22,7 +22,7 @@ IPluginWrapper::IPluginWrapper(const std::shared_ptrGetName(); m_is_new_api = m_old_plugin->IsNewAPI(); m_core = m_old_plugin->GetCore(); - m_executor_manager = m_old_plugin->executorManager()->get_ov_manager(); + m_executor_manager = m_old_plugin->executorManager(); } const std::shared_ptr& IPluginWrapper::update_exec_network( diff --git a/src/inference/src/threading/ie_cpu_streams_executor.cpp b/src/inference/src/threading/ie_cpu_streams_executor.cpp deleted file mode 100644 index 41541064d4e..00000000000 --- a/src/inference/src/threading/ie_cpu_streams_executor.cpp +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#include "threading/ie_cpu_streams_executor.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ie_parallel_custom_arena.hpp" -#include "ie_system_conf.h" -#include "openvino/runtime/threading/cpu_streams_executor.hpp" -#include "openvino/runtime/threading/istreams_executor.hpp" -#include "threading/ie_executor_manager.hpp" -#include "threading/ie_istreams_executor.hpp" -#include "threading/ie_thread_affinity.hpp" -#include "threading/ie_thread_local.hpp" - -namespace InferenceEngine { -struct CPUStreamsExecutor::Impl : public ov::threading::CPUStreamsExecutor { - Impl(const InferenceEngine::IStreamsExecutor::Config& config) : ov::threading::CPUStreamsExecutor(config) {} -}; - -int CPUStreamsExecutor::GetStreamId() { - return _impl->get_stream_id(); -} - -int CPUStreamsExecutor::GetNumaNodeId() { - return _impl->get_numa_node_id(); -} - -int CPUStreamsExecutor::GetSocketId() { - return _impl->get_socket_id(); -} - -CPUStreamsExecutor::CPUStreamsExecutor(const IStreamsExecutor::Config& config) : _impl{new Impl(config)} {} - -CPUStreamsExecutor::~CPUStreamsExecutor() {} - -void CPUStreamsExecutor::Execute(Task task) { - _impl->execute(std::move(task)); -} - -void CPUStreamsExecutor::run(Task task) { - _impl->run(std::move(task)); -} - -} // namespace InferenceEngine diff --git a/src/inference/src/threading/ie_executor_manager.cpp b/src/inference/src/threading/ie_executor_manager.cpp deleted file mode 100644 index 3fcba5a1a7c..00000000000 --- a/src/inference/src/threading/ie_executor_manager.cpp +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#include "threading/ie_executor_manager.hpp" - -#include "ie_parallel.hpp" -#include "openvino/runtime/properties.hpp" -#include "openvino/runtime/threading/executor_manager.hpp" -#include "openvino/runtime/threading/istreams_executor.hpp" -#include "openvino/runtime/threading/itask_executor.hpp" -#include "threading/ie_cpu_streams_executor.hpp" -#include "threading/ie_itask_executor.hpp" -#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO -# if (TBB_INTERFACE_VERSION < 12000) -# include -# else -# include -# endif -#endif - -#include -#include -#include -#include - -namespace InferenceEngine { -namespace { -class ExecutorManagerImpl : public ExecutorManager { -public: - ExecutorManagerImpl(const std::shared_ptr& manager); - ITaskExecutor::Ptr getExecutor(const std::string& id) override; - IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) override; - size_t getExecutorsNumber() const override; - size_t getIdleCPUStreamsExecutorsNumber() const override; - void clear(const std::string& id = {}) override; - void setTbbFlag(bool flag) override; - bool getTbbFlag() override; - -private: - std::shared_ptr m_manager; - std::shared_ptr get_ov_manager() const override { - return m_manager; - } -}; - -class TaskExecutorWrapper : public ITaskExecutor { - std::shared_ptr m_executor; - -public: - TaskExecutorWrapper(const std::shared_ptr& executor) : m_executor(executor) {} - void run(Task task) override { - m_executor->run(task); - } - - void runAndWait(const std::vector& tasks) override { - m_executor->run_and_wait(tasks); - } -}; - -class StreamsExecutorWrapper : public IStreamsExecutor { - std::shared_ptr m_executor; - -public: - StreamsExecutorWrapper(const std::shared_ptr& executor) : m_executor(executor) {} - void run(Task task) override { - m_executor->run(task); - } - - void runAndWait(const std::vector& tasks) override { - m_executor->run_and_wait(tasks); - } - int GetStreamId() override { - return m_executor->get_stream_id(); - } - - int GetNumaNodeId() override { - return m_executor->get_numa_node_id(); - } - - int GetSocketId() override { - return m_executor->get_socket_id(); - } - - void Execute(Task task) override { - m_executor->execute(task); - } -}; - -} // namespace - -ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr& manager) - : m_manager(manager) {} - -void ExecutorManagerImpl::setTbbFlag(bool flag) { - m_manager->set_property({{ov::force_tbb_terminate.name(), flag}}); -} - -bool ExecutorManagerImpl::getTbbFlag() { - return m_manager->get_property(ov::force_tbb_terminate.name()).as(); -} - -ITaskExecutor::Ptr ExecutorManagerImpl::getExecutor(const std::string& id) { - return std::make_shared(m_manager->get_executor(id)); -} - -IStreamsExecutor::Ptr ExecutorManagerImpl::getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) { - return std::make_shared(m_manager->get_idle_cpu_streams_executor(config)); -} - -size_t ExecutorManagerImpl::getExecutorsNumber() const { - return m_manager->get_executors_number(); -} - -size_t ExecutorManagerImpl::getIdleCPUStreamsExecutorsNumber() const { - return m_manager->get_idle_cpu_streams_executors_number(); -} - -void ExecutorManagerImpl::clear(const std::string& id) { - return m_manager->clear(id); -} - -std::shared_ptr create_old_manager( - const std::shared_ptr& manager) { - return std::make_shared(manager); -} - -namespace { - -class ExecutorManagerHolder { - std::mutex _mutex; - std::weak_ptr _manager; - -public: - ExecutorManagerHolder(const ExecutorManagerHolder&) = delete; - ExecutorManagerHolder& operator=(const ExecutorManagerHolder&) = delete; - - ExecutorManagerHolder() = default; - - ExecutorManager::Ptr get() { - std::lock_guard lock(_mutex); - auto manager = _manager.lock(); - if (!manager) { - _manager = manager = create_old_manager(ov::threading::executor_manager()); - } - return manager; - } -}; - -} // namespace - -ExecutorManager::Ptr executorManager() { - static ExecutorManagerHolder executorManagerHolder; - return executorManagerHolder.get(); -} - -ExecutorManager* ExecutorManager::getInstance() { - static auto ptr = executorManager().get(); - return ptr; -} - -} // namespace InferenceEngine diff --git a/src/inference/src/threading/ie_istreams_executor.cpp b/src/inference/src/threading/ie_istreams_executor.cpp deleted file mode 100644 index 3d3bdebe813..00000000000 --- a/src/inference/src/threading/ie_istreams_executor.cpp +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#include "threading/ie_istreams_executor.hpp" - -#include -#include -#include -#include -#include - -#include "cpp_interfaces/interface/ie_internal_plugin_config.hpp" -#include "ie_parallel.hpp" -#include "ie_parallel_custom_arena.hpp" -#include "ie_parameter.hpp" -#include "ie_plugin_config.hpp" -#include "ie_system_conf.h" -#include "openvino/runtime/properties.hpp" -#include "openvino/util/common_util.hpp" - -namespace InferenceEngine { -IStreamsExecutor::~IStreamsExecutor() {} - -std::vector IStreamsExecutor::Config::SupportedKeys() const { - return get_property(ov::supported_properties.name()).as>(); -} -int IStreamsExecutor::Config::GetDefaultNumStreams(const bool enable_hyper_thread) { - return get_default_num_streams(enable_hyper_thread); -} - -int IStreamsExecutor::Config::GetHybridNumStreams(std::map& config, const int stream_mode) { - return get_hybrid_num_streams(config, stream_mode); -} - -void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::string& value) { - set_property(key, value); -} - -Parameter IStreamsExecutor::Config::GetConfig(const std::string& key) const { - return get_property(key); -} - -void IStreamsExecutor::Config::UpdateHybridCustomThreads(Config& config) { - return update_hybrid_custom_threads(config); -} - -IStreamsExecutor::Config IStreamsExecutor::Config::MakeDefaultMultiThreaded(const IStreamsExecutor::Config& initial, - const bool fp_intesive) { - return make_default_multi_threaded(initial); -} - -IStreamsExecutor::Config IStreamsExecutor::Config::ReserveCpuThreads(const Config& initial) { - return reserve_cpu_threads(initial); -} - -} // namespace InferenceEngine diff --git a/src/inference/src/threading/ie_itask_executor.cpp b/src/inference/src/threading/ie_itask_executor.cpp deleted file mode 100644 index 8e6bf89f389..00000000000 --- a/src/inference/src/threading/ie_itask_executor.cpp +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#include "threading/ie_itask_executor.hpp" - -#include -#include -#include -#include - -namespace InferenceEngine { - -void ITaskExecutor::runAndWait(const std::vector& tasks) { - run_and_wait(tasks); -} - -} // namespace InferenceEngine diff --git a/src/inference/src/threading/ie_parallel_custom_arena.hpp b/src/inference/src/threading/ie_parallel_custom_arena.hpp deleted file mode 100755 index ad6582cca14..00000000000 --- a/src/inference/src/threading/ie_parallel_custom_arena.hpp +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @brief Contains declarations and custom threading interfaces based on TBB info and task_arena APIs. - * - * @file ie_parallel_custom.hpp - */ - -#pragma once - -#include "dev/threading/parallel_custom_arena.hpp" diff --git a/src/inference/src/threading/ie_tbb_streams_executor.cpp b/src/inference/src/threading/ie_tbb_streams_executor.cpp deleted file mode 100644 index c5304e222c3..00000000000 --- a/src/inference/src/threading/ie_tbb_streams_executor.cpp +++ /dev/null @@ -1,304 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#include "threading/ie_tbb_streams_executor.hpp" - -#include -#include -#include -#include -#include -#include -#include - -#include "details/ie_exception.hpp" -#include "ie_parallel.hpp" -#include "ie_parallel_custom_arena.hpp" -#include "ie_system_conf.h" -#include "threading/ie_thread_affinity.hpp" - -#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -# include -# include -# ifndef TBB_PREVIEW_GLOBAL_CONTROL -# define TBB_PREVIEW_GLOBAL_CONTROL 1 -# endif -# include -# include -# include - -namespace InferenceEngine { -struct TBBStreamsExecutor::Impl { - struct Stream; - using TaskQueue = tbb::concurrent_queue; - using StreamQueue = tbb::concurrent_bounded_queue; - using LocalStreams = tbb::enumerable_thread_specific; - struct Shared : public std::enable_shared_from_this { - using Ptr = std::shared_ptr; - TaskQueue _taskQueue; - StreamQueue _streamQueue; - }; - struct Stream { - struct Observer : tbb::task_scheduler_observer { - Stream* _thisStream = nullptr; - LocalStreams* _localStream = nullptr; - CpuSet _mask; - int _ncpus = 0; - int _threadBindingStep = 0; - int _offset = 0; - - Observer(custom::task_arena& arena, - Stream* thisStream, - LocalStreams* localStream, - const bool pinToCores, - const int streamId, - const int threadsPerStream, - const int threadBindingStep, - const int threadBindingOffset) - : tbb::task_scheduler_observer{static_cast(arena)}, - _thisStream{thisStream}, - _localStream{localStream}, - _threadBindingStep{threadBindingStep}, - _offset{streamId * threadsPerStream + threadBindingOffset} { - if (pinToCores) { - std::tie(_mask, _ncpus) = GetProcessMask(); - } - } - void on_scheduler_entry(bool) override { - _localStream->local() = _thisStream; - if (nullptr != _mask) { - PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(), - _threadBindingStep, - _ncpus, - _mask); - } - } - void on_scheduler_exit(bool) override { - _localStream->local() = nullptr; - if (nullptr != _mask) { - PinCurrentThreadByMask(_ncpus, _mask); - } - } - ~Observer() override = default; - }; - - explicit Stream(Impl* impl, const bool externStream = false) : _impl{impl} { - { - std::lock_guard lock{_impl->_streamIdMutex}; - if (_impl->_streamIdQueue.empty()) { - _streamId = _impl->_streamId++; - } else { - _streamId = _impl->_streamIdQueue.front(); - _impl->_streamIdQueue.pop(); - } - } - _numaNodeId = _impl->_config._streams - ? _impl->_usedNumaNodes.at((_streamId % _impl->_config._streams) / - ((_impl->_config._streams + _impl->_usedNumaNodes.size() - 1) / - _impl->_usedNumaNodes.size())) - : _impl->_usedNumaNodes.at(_streamId % _impl->_usedNumaNodes.size()); - auto concurrency = - (0 == _impl->_config._threadsPerStream) ? tbb::task_arena::automatic : _impl->_config._threadsPerStream; - auto masterThreads = externStream ? 1u : 0u; - if (ThreadBindingType::HYBRID_AWARE == _impl->_config._threadBindingType) { - if (Config::PreferredCoreType::ROUND_ROBIN != _impl->_config._threadPreferredCoreType) { - if (Config::PreferredCoreType::ANY == _impl->_config._threadPreferredCoreType) { - _arena.initialize(concurrency); - } else { - const auto selected_core_type = - Config::PreferredCoreType::BIG == _impl->_config._threadPreferredCoreType - ? custom::info::core_types().back() // running on Big cores only - : custom::info::core_types().front(); // running on Little cores only - _arena.initialize(custom::task_arena::constraints{} - .set_core_type(selected_core_type) - .set_max_concurrency(concurrency)); - } - } else { - // assigning the stream to the core type in the round-robin fashion - // wrapping around total_streams (i.e. how many streams all different core types can handle - // together) - const auto total_streams = _impl->_totalSreamsOnCoreTypes.back().second; - const auto streamId_wrapped = _streamId % total_streams; - const auto& selected_core_type = - std::find_if(_impl->_totalSreamsOnCoreTypes.cbegin(), - _impl->_totalSreamsOnCoreTypes.cend(), - [streamId_wrapped](const decltype(_impl->_totalSreamsOnCoreTypes)::value_type& p) { - return p.second > streamId_wrapped; - }) - ->first; - _arena.initialize(custom::task_arena::constraints{} - .set_core_type(selected_core_type) - .set_max_concurrency(concurrency)); - } - } else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) { - _arena.initialize(custom::task_arena::constraints{_numaNodeId, concurrency}); - } else { - _arena.initialize(concurrency, masterThreads); - } - _observer.reset(new Observer{_arena, - this, - &(_impl->_localStream), - (ThreadBindingType::CORES == _impl->_config._threadBindingType), - _streamId, - _impl->_config._threadsPerStream, - _impl->_config._threadBindingStep, - _impl->_config._threadBindingOffset}); - _observer->observe(true); - } - - ~Stream() { - static_cast(_arena).terminate(); - _observer->observe(false); - { - std::lock_guard lock{_impl->_streamIdMutex}; - _impl->_streamIdQueue.push(_streamId); - } - } - - Impl* _impl = nullptr; - int _streamId = 0; - int _numaNodeId = 0; - custom::task_arena _arena; - std::unique_ptr _observer; - }; - - using Streams = std::list; - using ExternStreams = tbb::enumerable_thread_specific; - - explicit Impl(const Config& config) - : _config{config}, - _shared{std::make_shared()}, - _localStream{nullptr}, - _externStreams{this, true} { - if (_config._streams * _config._threadsPerStream >= static_cast(std::thread::hardware_concurrency())) { - _maxTbbThreads.reset( - new tbb::global_control{tbb::global_control::max_allowed_parallelism, - static_cast(_config._streams * _config._threadsPerStream + 1)}); - } - auto numaNodes = getAvailableNUMANodes(); - if (_config._streams != 0) { - std::copy_n(std::begin(numaNodes), - std::min(static_cast(_config._streams), numaNodes.size()), - std::back_inserter(_usedNumaNodes)); - } else { - _usedNumaNodes = numaNodes; - } - if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) { - const auto core_types = custom::info::core_types(); - const int threadsPerStream = - (0 == config._threadsPerStream) ? std::thread::hardware_concurrency() : config._threadsPerStream; - int sum = 0; - // reversed order, so BIG cores are first - for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) { - const auto& type = *iter; - // calculating the #streams per core type - const int num_streams_for_core_type = - std::max(1, - custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(type)) / - threadsPerStream); - sum += num_streams_for_core_type; - // prefix sum, so the core type for a given stream id will be deduced just as a upper_bound - // (notice that the map keeps the elements in the descending order, so the big cores are populated - // first) - _totalSreamsOnCoreTypes.emplace_back(type, sum); - } - } - _shared->_streamQueue.set_capacity(_config._streams); - for (int streamId = 0; streamId < _config._streams; ++streamId) { - _streams.emplace_back(this); - _shared->_streamQueue.push(&(_streams.back())); - } - } - - ~Impl() { - for (int streamId = 0; streamId < _config._streams; ++streamId) { - Stream* stream = nullptr; - _shared->_streamQueue.pop(stream); - (void)stream; - } - } - - static void Schedule(Shared::Ptr& shared, Task task) { - Stream* stream = nullptr; - if (shared->_streamQueue.try_pop(stream)) { - struct TryPop { - void operator()() const { - try { - do { - Task task = std::move(_task); - task(); - } while (_shared->_taskQueue.try_pop(_task)); - } catch (...) { - } - if (_shared->_streamQueue.try_push(_stream)) { - if (_shared->_taskQueue.try_pop(_task)) { - Schedule(_shared, std::move(_task)); - } - } - } - Stream* _stream; - mutable Shared::Ptr _shared; - mutable Task _task; - }; - stream->_arena.enqueue(TryPop{stream, shared->shared_from_this(), std::move(task)}); - } else { - shared->_taskQueue.push(std::move(task)); - } - } - - Config _config; - std::unique_ptr _maxTbbThreads; - std::mutex _streamIdMutex; - int _streamId = 0; - std::queue _streamIdQueue; - std::vector _usedNumaNodes; - Shared::Ptr _shared; - LocalStreams _localStream; - ExternStreams _externStreams; - Streams _streams; - using StreamIdToCoreTypes = std::vector>; - StreamIdToCoreTypes _totalSreamsOnCoreTypes; -}; - -TBBStreamsExecutor::TBBStreamsExecutor(const Config& config) : _impl{new TBBStreamsExecutor::Impl{config}} {} - -TBBStreamsExecutor::~TBBStreamsExecutor() { - _impl.reset(); -} - -int TBBStreamsExecutor::GetStreamId() { - auto stream = _impl->_localStream.local(); - if (nullptr == stream) { - stream = &(_impl->_externStreams.local()); - } - return stream->_streamId; -} - -int TBBStreamsExecutor::GetNumaNodeId() { - auto stream = _impl->_localStream.local(); - if (nullptr == stream) { - stream = &(_impl->_externStreams.local()); - } - return stream->_numaNodeId; -} - -void TBBStreamsExecutor::run(Task task) { - if (_impl->_config._streams == 0) { - Execute(std::move(task)); - } else { - Impl::Schedule(_impl->_shared, std::move(task)); - } -} - -void TBBStreamsExecutor::Execute(Task task) { - auto stream = _impl->_localStream.local(); - if (nullptr == stream) { - _impl->_externStreams.local()._arena.execute(std::move(task)); - } else { - stream->_arena.execute(std::move(task)); - } -} - -} // namespace InferenceEngine -#endif // ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) diff --git a/src/inference/src/threading/ie_thread_affinity.cpp b/src/inference/src/threading/ie_thread_affinity.cpp deleted file mode 100644 index 578775ac42a..00000000000 --- a/src/inference/src/threading/ie_thread_affinity.cpp +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#include "threading/ie_thread_affinity.hpp" - -#include "dev/threading/thread_affinity.hpp" - -namespace InferenceEngine { - -std::tuple GetProcessMask() { - return ov::threading::get_process_mask(); -} - -void ReleaseProcessMask(cpu_set_t* mask) { - ov::threading::release_process_mask(mask); -} - -bool PinThreadToVacantCore(int thrIdx, - int hyperthreads, - int ncores, - const CpuSet& procMask, - const std::vector& cpu_ids, - int cpuIdxOffset) { - return ov::threading::pin_thread_to_vacant_core(thrIdx, hyperthreads, ncores, procMask, cpu_ids, cpuIdxOffset); -} -bool PinCurrentThreadByMask(int ncores, const CpuSet& procMask) { - return ov::threading::pin_current_thread_by_mask(ncores, procMask); -} -bool PinCurrentThreadToSocket(int socket) { - return ov::threading::pin_current_thread_to_socket(socket); -} - -} // namespace InferenceEngine diff --git a/src/inference/src/threading/ie_thread_affinity.hpp b/src/inference/src/threading/ie_thread_affinity.hpp deleted file mode 100644 index 065a0cdfc25..00000000000 --- a/src/inference/src/threading/ie_thread_affinity.hpp +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -#pragma once - -#include -#include -#include - -#include "dev/threading/thread_affinity.hpp" - -#if !(defined(__APPLE__) || defined(__EMSCRIPTEN__) || defined(_WIN32)) -# include -#endif - -namespace InferenceEngine { -#if (defined(__APPLE__) || defined(__EMSCRIPTEN__) || defined(_WIN32)) -using cpu_set_t = ov::threading::cpu_set_t; -#endif // (defined(__APPLE__) || defined(__EMSCRIPTEN__) || defined(_WIN32)) - -/** - * @brief Release the cores affinity mask for the current process - * @ingroup ie_dev_api_threading - * - * @param mask The mask - */ -void ReleaseProcessMask(cpu_set_t* mask); - -using ReleaseProcessMaskDeleter = ov::threading::ReleaseProcessMaskDeleter; - -using CpuSet = ov::threading::CpuSet; - -/** - * @brief Get the cores affinity mask for the current process - * @ingroup ie_dev_api_threading - * @return A core affinity mask - */ -std::tuple GetProcessMask(); - -/** - * @brief Pins current thread to a set of cores determined by the mask - * @ingroup ie_dev_api_threading - * - * @param[in] thrIdx The thr index - * @param[in] hyperThreads The hyper threads - * @param[in] ncores The ncores - * @param[in] processMask The process mask - * @return `True` in case of success, `false` otherwise - */ -bool PinThreadToVacantCore(int thrIdx, - int hyperThreads, - int ncores, - const CpuSet& processMask, - const std::vector& cpu_ids = {}, - int cpuIdxOffset = 0); - -/** - * @brief Pins thread to a spare core in the round-robin scheme, while respecting the given process mask. - * The function can also handle the hyper-threading (by populating the physical cores first) - * @ingroup ie_dev_api_threading - * - * @param[in] ncores The ncores - * @param[in] processMask The process mask - * @return `True` in case of success, `false` otherwise - */ -bool PinCurrentThreadByMask(int ncores, const CpuSet& processMask); - -/** - * @brief Pins a current thread to a socket. - * @ingroup ie_dev_api_threading - * - * @param[in] socket The socket id - * @return `True` in case of success, `false` otherwise - */ -bool PinCurrentThreadToSocket(int socket); -} // namespace InferenceEngine diff --git a/src/inference/src/threading/itt.hpp b/src/inference/src/threading/itt.hpp deleted file mode 100644 index 3c0e64d5586..00000000000 --- a/src/inference/src/threading/itt.hpp +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright (C) 2018-2023 Intel Corporation -// SPDX-License-Identifier: Apache-2.0 -// - -/** - * @brief Defines openvino tbbbind domains for tracing - * @file itt.hpp - */ - -#pragma once - -#include "dev/threading/itt.hpp" diff --git a/src/inference/tests/unit/cpp_interfaces/ie_infer_async_request_thread_safe_default_test.cpp b/src/inference/tests/unit/cpp_interfaces/ie_infer_async_request_thread_safe_default_test.cpp index 8519204d5e7..ece567708d5 100644 --- a/src/inference/tests/unit/cpp_interfaces/ie_infer_async_request_thread_safe_default_test.cpp +++ b/src/inference/tests/unit/cpp_interfaces/ie_infer_async_request_thread_safe_default_test.cpp @@ -9,16 +9,15 @@ #include #include #include -#include +#include "openvino/runtime/threading/cpu_streams_executor.hpp" #include "unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp" #include "unit_test_utils/mocks/cpp_interfaces/interface/mock_iinfer_request_internal.hpp" #include "unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp" using namespace ::testing; using namespace std; -using namespace InferenceEngine; -using namespace InferenceEngine::details; +using namespace ov::threading; struct DeferedExecutor : public ITaskExecutor { using Ptr = std::shared_ptr; @@ -183,7 +182,7 @@ TEST_F(InferRequestThreadSafeDefaultTests, callbackIsCalledIfAsyncRequestFailed) } TEST_F(InferRequestThreadSafeDefaultTests, canCatchExceptionIfAsyncRequestFailedAndNoCallback) { - auto taskExecutor = std::make_shared(); + auto taskExecutor = std::make_shared(ov::threading::IStreamsExecutor::Config{}); testRequest = make_shared(mockInferRequestInternal, taskExecutor, taskExecutor); EXPECT_CALL(*mockInferRequestInternal.get(), InferImpl()).WillOnce(Throw(std::exception())); testRequest->StartAsync(); diff --git a/src/plugins/intel_cpu/src/compiled_model.cpp b/src/plugins/intel_cpu/src/compiled_model.cpp index c6f354a0e82..8b33f6ca75f 100644 --- a/src/plugins/intel_cpu/src/compiled_model.cpp +++ b/src/plugins/intel_cpu/src/compiled_model.cpp @@ -15,11 +15,6 @@ #include "serialize.h" #include "openvino/runtime/threading/executor_manager.hpp" #include "transformations/transformation_pipeline.h" -#define FIX_62820 0 -#if FIX_62820 && ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -# include -#endif - #include "openvino/runtime/properties.hpp" #include "openvino/util/common_util.hpp" #include "openvino/runtime/threading/cpu_streams_executor.hpp" @@ -71,20 +66,11 @@ CompiledModel::CompiledModel(const std::shared_ptr& model, : IStreamsExecutor::Config::make_default_multi_threaded(m_cfg.streamExecutorConfig, isFloatModel); streamsExecutorConfig._name = "CPUStreamsExecutor"; m_cfg.streamExecutorConfig._threads = streamsExecutorConfig._threads; -#if FIX_62820 && (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) - m_task_executor = std::make_shared(streamsExecutorConfig); -#else m_task_executor = m_plugin->get_executor_manager()->get_idle_cpu_streams_executor(streamsExecutorConfig); -#endif } if (0 != cfg.streamExecutorConfig._streams) { -#if FIX_62820 && (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) - // There is no additional threads but we still need serialize callback execution to preserve legacy behaviour - m_callback_executor = std::make_shared(); -#else m_callback_executor = m_plugin->get_executor_manager()->get_idle_cpu_streams_executor( IStreamsExecutor::Config{"CPUCallbackExecutor", 1, 0, IStreamsExecutor::ThreadBindingType::NONE}); -#endif } else { m_callback_executor = m_task_executor; } diff --git a/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp b/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp index 5d1088cb938..28a1324c5d4 100644 --- a/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp +++ b/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp @@ -11,6 +11,7 @@ #include "cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp" +using namespace ov::threading; using namespace InferenceEngine; class MockAsyncInferRequestDefault : public AsyncInferRequestThreadSafeDefault { diff --git a/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp b/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp index 1ea59a0314f..ce74aba12da 100644 --- a/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp +++ b/src/tests/test_utils/unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp @@ -7,11 +7,12 @@ #include #include -#include -class MockTaskExecutor : public InferenceEngine::ITaskExecutor { +#include "openvino/runtime/threading/itask_executor.hpp" + +class MockTaskExecutor : public ov::threading::ITaskExecutor { public: typedef std::shared_ptr Ptr; - MOCK_METHOD1(run, void(InferenceEngine::Task)); + MOCK_METHOD1(run, void(ov::threading::Task)); };