Remove legacy threading code (#21279)

* remove legacy threading code

* fix code style
This commit is contained in:
Sun Xiaoxia 2023-11-27 22:45:34 +08:00 committed by GitHub
parent d722e42052
commit 0bdd658317
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 39 additions and 1512 deletions

View File

@ -12,9 +12,10 @@
#include "cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp" #include "cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp"
#include "cpp_interfaces/interface/ie_iexecutable_network_internal.hpp" #include "cpp_interfaces/interface/ie_iexecutable_network_internal.hpp"
#include "cpp_interfaces/interface/ie_iplugin_internal.hpp" #include "cpp_interfaces/interface/ie_iplugin_internal.hpp"
#include "threading/ie_cpu_streams_executor.hpp" #include "openvino/runtime/threading/cpu_streams_executor.hpp"
namespace InferenceEngine { namespace ov {
namespace threading {
/** /**
* @brief This class provides optimal thread safe default implementation. * @brief This class provides optimal thread safe default implementation.
@ -77,4 +78,5 @@ protected:
ITaskExecutor::Ptr _callbackExecutor = nullptr; //!< Holds a callback executor ITaskExecutor::Ptr _callbackExecutor = nullptr; //!< Holds a callback executor
}; };
} // namespace InferenceEngine } // namespace threading
} // namespace ov

View File

@ -16,11 +16,14 @@
#include "cpp_interfaces/interface/ie_iinfer_request_internal.hpp" #include "cpp_interfaces/interface/ie_iinfer_request_internal.hpp"
#include "ie_api.h" #include "ie_api.h"
#include "threading/ie_immediate_executor.hpp" #include "openvino/runtime/threading/immediate_executor.hpp"
#include "threading/ie_istreams_executor.hpp" #include "openvino/runtime/threading/istreams_executor.hpp"
#include "threading/ie_itask_executor.hpp" #include "openvino/runtime/threading/itask_executor.hpp"
namespace InferenceEngine { using namespace InferenceEngine;
namespace ov {
namespace threading {
IE_SUPPRESS_DEPRECATED_START IE_SUPPRESS_DEPRECATED_START
/** /**
@ -59,11 +62,11 @@ class INFERENCE_ENGINE_1_0_DEPRECATED AsyncInferRequestThreadSafeDefault : publi
Callback _callback; Callback _callback;
}; };
struct ImmediateStreamsExecutor : public InferenceEngine::ITaskExecutor { struct ImmediateStreamsExecutor : public ov::threading::ITaskExecutor {
explicit ImmediateStreamsExecutor(const IStreamsExecutor::Ptr& streamsExecutor) explicit ImmediateStreamsExecutor(const IStreamsExecutor::Ptr& streamsExecutor)
: _streamsExecutor{streamsExecutor} {} : _streamsExecutor{streamsExecutor} {}
void run(InferenceEngine::Task task) override { void run(ov::threading::Task task) override {
_streamsExecutor->Execute(std::move(task)); _streamsExecutor->execute(std::move(task));
} }
IStreamsExecutor::Ptr _streamsExecutor; IStreamsExecutor::Ptr _streamsExecutor;
}; };
@ -449,4 +452,5 @@ private:
InferState _state = InferState::Idle; InferState _state = InferState::Idle;
}; };
IE_SUPPRESS_DEPRECATED_END IE_SUPPRESS_DEPRECATED_END
} // namespace InferenceEngine } // namespace threading
} // namespace ov

View File

@ -23,6 +23,8 @@
#include "openvino/util/pp.hpp" #include "openvino/util/pp.hpp"
#include "so_ptr.hpp" #include "so_ptr.hpp"
using namespace ov::threading;
namespace InferenceEngine { namespace InferenceEngine {
class ExecutorManager; class ExecutorManager;
@ -301,7 +303,7 @@ public:
* @brief Gets reference to tasks execution manager * @brief Gets reference to tasks execution manager
* @return Reference to ExecutorManager interface * @return Reference to ExecutorManager interface
*/ */
const std::shared_ptr<ExecutorManager>& executorManager() const; const std::shared_ptr<ov::threading::ExecutorManager>& executorManager() const;
/** /**
* @brief Queries a plugin about supported layers in network * @brief Queries a plugin about supported layers in network
@ -368,11 +370,11 @@ protected:
void SetExeNetworkInfo(const std::shared_ptr<IExecutableNetworkInternal>& exeNetwork, void SetExeNetworkInfo(const std::shared_ptr<IExecutableNetworkInternal>& exeNetwork,
const std::shared_ptr<const ov::Model>& function); const std::shared_ptr<const ov::Model>& function);
std::string _pluginName; //!< A device name that plugins enables std::string _pluginName; //!< A device name that plugins enables
std::map<std::string, std::string> _config; //!< A map config keys -> values std::map<std::string, std::string> _config; //!< A map config keys -> values
std::weak_ptr<InferenceEngine::ICore> _core; //!< A pointer to ICore interface std::weak_ptr<InferenceEngine::ICore> _core; //!< A pointer to ICore interface
std::shared_ptr<ExecutorManager> _executorManager; //!< A tasks execution manager std::shared_ptr<ov::threading::ExecutorManager> _executorManager; //!< A tasks execution manager
bool _isNewAPI; //!< A flag which shows used API bool _isNewAPI; //!< A flag which shows used API
}; };
/** /**

View File

@ -1,58 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file ie_cpu_streams_executor.hpp
* @brief A header file for Inference Engine CPU-Streams-based Executor implementation.
*/
#pragma once
#include <memory>
#include <string>
#include "threading/ie_istreams_executor.hpp"
namespace InferenceEngine {
/**
* @class CPUStreamsExecutor
* @ingroup ie_dev_api_threading
* @brief CPU Streams executor implementation. The executor splits the CPU into groups of threads,
* that can be pinned to cores or NUMA nodes.
* It uses custom threads to pull tasks from single queue.
*/
class INFERENCE_ENGINE_API_CLASS(CPUStreamsExecutor) : public IStreamsExecutor {
public:
/**
* @brief A shared pointer to a CPUStreamsExecutor object
*/
using Ptr = std::shared_ptr<CPUStreamsExecutor>;
/**
* @brief Constructor
* @param config Stream executor parameters
*/
explicit CPUStreamsExecutor(const IStreamsExecutor::Config& config = {});
/**
* @brief A class destructor
*/
~CPUStreamsExecutor() override;
void run(Task task) override;
void Execute(Task task) override;
int GetStreamId() override;
int GetNumaNodeId() override;
int GetSocketId() override;
private:
struct Impl;
std::unique_ptr<Impl> _impl;
};
} // namespace InferenceEngine

View File

@ -1,100 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file ie_executor_manager.hpp
* @brief A header file for Executor Manager
*/
#pragma once
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "threading/ie_istreams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
namespace ov {
namespace threading {
class ExecutorManager;
}
} // namespace ov
namespace InferenceEngine {
class IPluginWrapper;
/**
* @interface ExecutorManager
* @brief Interface for tasks execution manager.
* This is global point for getting task executor objects by string id.
* It's necessary in multiple asynchronous requests for having unique executors to avoid oversubscription.
* E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads
* to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor.
* @ingroup ie_dev_api_threading
*/
class INFERENCE_ENGINE_API_CLASS(ExecutorManager) {
public:
/**
* A shared pointer to ExecutorManager interface
*/
using Ptr = std::shared_ptr<ExecutorManager>;
/**
* @brief Returns executor by unique identificator
* @param id An unique identificator of device (Usually string representation of TargetDevice)
* @return A shared pointer to existing or newly ITaskExecutor
*/
virtual ITaskExecutor::Ptr getExecutor(const std::string& id) = 0;
/// @private
virtual IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) = 0;
/**
* @cond
*/
virtual size_t getExecutorsNumber() const = 0;
virtual size_t getIdleCPUStreamsExecutorsNumber() const = 0;
virtual void clear(const std::string& id = {}) = 0;
/**
* @endcond
*/
virtual ~ExecutorManager() = default;
/**
* @brief Returns a global instance of ExecutorManager
* @return The instance.
*/
INFERENCE_ENGINE_DEPRECATED("Use IInferencePlugin::executorManager() instead")
static ExecutorManager* getInstance();
/**
* @brief Set TBB terminate flag
* @param flag A boolean value:
* True to terminate tbb during destruction
* False to not terminate tbb during destruction
* @return void
*/
virtual void setTbbFlag(bool flag) = 0;
virtual bool getTbbFlag() = 0;
private:
virtual std::shared_ptr<ov::threading::ExecutorManager> get_ov_manager() const = 0;
friend class IPluginWrapper;
};
INFERENCE_ENGINE_API_CPP(ExecutorManager::Ptr) executorManager();
std::shared_ptr<InferenceEngine::ExecutorManager> create_old_manager(
const std::shared_ptr<ov::threading::ExecutorManager>& manager);
} // namespace InferenceEngine

View File

@ -1,41 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file ie_immediate_executor.hpp
* @brief A header file for Inference Engine Immediate Executor implementation
*/
#pragma once
#include <memory>
#include <string>
#include "openvino/runtime/threading/immediate_executor.hpp"
#include "threading/ie_itask_executor.hpp"
namespace InferenceEngine {
/**
* @brief Task executor implementation that just run tasks in current thread during calling of run() method
* @ingroup ie_dev_api_threading
*/
class ImmediateExecutor : public ITaskExecutor {
public:
/**
* @brief A shared pointer to a ImmediateExecutor object
*/
using Ptr = std::shared_ptr<ImmediateExecutor>;
/**
* @brief Destroys the object.
*/
~ImmediateExecutor() override = default;
void run(Task task) override {
task();
}
};
} // namespace InferenceEngine

View File

@ -1,162 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file ie_istreams_executor.hpp
* @brief A header file for Inference Engine Streams-based Executor Interface
*/
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "ie_parameter.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
namespace InferenceEngine {
/**
* @interface IStreamsExecutor
* @ingroup ie_dev_api_threading
* @brief Interface for Streams Task Executor. This executor groups worker threads into so-called `streams`.
* @par CPU
* The executor executes all parallel tasks using threads from one stream.
* With proper pinning settings it should reduce cache misses for memory bound workloads.
* @par NUMA
* On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream
*/
class INFERENCE_ENGINE_API_CLASS(IStreamsExecutor) : public ITaskExecutor, public ov::threading::IStreamsExecutor {
public:
/**
* A shared pointer to IStreamsExecutor interface
*/
using Ptr = std::shared_ptr<IStreamsExecutor>;
/**
* @brief Defines IStreamsExecutor configuration
*/
struct INFERENCE_ENGINE_API_CLASS(Config) : public ov::threading::IStreamsExecutor::Config {
/**
* @brief Supported Configuration keys
* @return vector of supported configuration keys
*/
std::vector<std::string> SupportedKeys() const;
/**
* @brief Parses configuration key/value pair
* @param key configuration key
* @param value configuration values
*/
void SetConfig(const std::string& key, const std::string& value);
/**
* @brief Return configuration value
* @param key configuration key
* @return configuration value wrapped into Parameter
*/
Parameter GetConfig(const std::string& key) const;
/**
* @brief Create appropriate multithreaded configuration
* filing unconfigured values from initial configuration using hardware properties
* @param initial Inital configuration
* @param fp_intesive additional hint for the the (Hybrid) core-types selection logic
* whether the executor should be configured for floating point intensive work (as opposite to int8
* intensive)
* @return configured values
*/
static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true);
static int GetDefaultNumStreams(
const bool enable_hyper_thread = true); // no network specifics considered (only CPU's caps);
static int GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode);
static void UpdateHybridCustomThreads(Config& config);
static Config ReserveCpuThreads(const Config& initial);
/**
* @brief A constructor with arguments
*
* @param[in] name The executor name
* @param[in] streams @copybrief Config::_streams
* @param[in] threadsPerStream @copybrief Config::_threadsPerStream
* @param[in] threadBindingType @copybrief Config::_threadBindingType
* @param[in] threadBindingStep @copybrief Config::_threadBindingStep
* @param[in] threadBindingOffset @copybrief Config::_threadBindingOffset
* @param[in] threads @copybrief Config::_threads
* @param[in] threadPreferBigCores @copybrief Config::_threadPreferBigCores
*/
Config(std::string name = "StreamsExecutor",
int streams = 1,
int threadsPerStream = 0,
ThreadBindingType threadBindingType = ThreadBindingType::NONE,
int threadBindingStep = 1,
int threadBindingOffset = 0,
int threads = 0,
PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY,
std::vector<std::vector<int>> streamsInfoTable = {},
bool cpuReservation = false)
: ov::threading::IStreamsExecutor::Config(name,
streams,
threadsPerStream,
threadBindingType,
threadBindingStep,
threadBindingOffset,
threads,
threadPreferredCoreType,
streamsInfoTable,
cpuReservation) {}
Config(const ov::threading::IStreamsExecutor::Config& config)
: ov::threading::IStreamsExecutor::Config(config) {}
};
/**
* @brief A virtual destructor
*/
~IStreamsExecutor() override;
/**
* @brief Return the index of current stream
* @return An index of current stream. Or throw exceptions if called not from stream thread
*/
virtual int GetStreamId() = 0;
/**
* @brief Return the id of current NUMA Node
* @return `ID` of current NUMA Node, or throws exceptions if called not from stream thread
*/
virtual int GetNumaNodeId() = 0;
/**
* @brief Return the id of current socket
* @return `ID` of current socket, or throws exceptions if called not from stream thread
*/
virtual int GetSocketId() = 0;
/**
* @brief Execute the task in the current thread using streams executor configuration and constraints
* @param task A task to start
*/
virtual void Execute(Task task) = 0;
int get_stream_id() override {
return GetStreamId();
}
int get_numa_node_id() override {
return GetNumaNodeId();
}
int get_socket_id() override {
return GetSocketId();
}
void execute(Task task) override {
Execute(task);
}
};
} // namespace InferenceEngine

View File

@ -1,73 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @file ie_itask_executor.hpp
* @brief A header file for Inference Engine Task Executor Interface
*/
#pragma once
#include <functional>
#include <memory>
#include <vector>
#include "ie_api.h"
#include "openvino/runtime/threading/itask_executor.hpp"
namespace InferenceEngine {
/**
* @brief Inference Engine Task Executor can use any copyable callable without parameters and output as a task.
* It would be wrapped into std::function object
* @ingroup ie_dev_api_threading
*/
using Task = ov::threading::Task;
/**
* @interface ITaskExecutor
* @ingroup ie_dev_api_threading
* @brief Interface for Task Executor.
* Inference Engine uses `InferenceEngine::ITaskExecutor` interface to run all asynchronous internal tasks.
* Different implementations of task executors can be used for different purposes:
* - To improve cache locality of memory bound CPU tasks some executors can limit task's affinity and maximum
concurrency.
* - The executor with one worker thread can be used to serialize access to acceleration device.
* - Immediate task executor can be used to satisfy `InferenceEngine::ITaskExecutor` interface restrictions but
run tasks in current thread.
* @note Implementation should guaranty thread safety of all methods
* It is `InferenceEngine::ITaskExecutor` user responsibility to wait for task execution completion.
* The `c++11` standard way to wait task completion is to use `std::packaged_task` or `std::promise` with
`std::future`.
* Here is an example of how to use `std::promise` to wait task completion and process task's exceptions:
* @snippet example_itask_executor.cpp itask_executor:define_pipeline
*/
class INFERENCE_ENGINE_API_CLASS(ITaskExecutor) : virtual public ov::threading::ITaskExecutor {
public:
/**
* A shared pointer to ITaskExecutor interface
*/
using Ptr = std::shared_ptr<ITaskExecutor>;
/**
* @brief Destroys the object.
*/
virtual ~ITaskExecutor() = default;
/**
* @brief Execute all of the tasks and waits for its completion.
* Default runAndWait() method implementation uses run() pure virtual method
* and higher level synchronization primitives from STL.
* The task is wrapped into std::packaged_task which returns std::future.
* std::packaged_task will call the task and signal to std::future that the task is finished
* or the exception is thrown from task
* Than std::future is used to wait for task execution completion and
* task exception extraction
* @note runAndWait() does not copy or capture tasks!
* @param tasks A vector of tasks to execute
*/
virtual void runAndWait(const std::vector<Task>& tasks);
};
} // namespace InferenceEngine

View File

@ -1,35 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#pragma once
#include <memory>
#include <string>
#include "ie_api.h"
#include "ie_parallel.hpp"
#include "threading/ie_istreams_executor.hpp"
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
namespace InferenceEngine {
/**
* @class TBBStreamsExecutor
* @brief CPU Streams executor implementation. Use TBB thread pool to run tasks
*/
class INFERENCE_ENGINE_API_CLASS(TBBStreamsExecutor) : public IStreamsExecutor {
public:
using Ptr = std::shared_ptr<TBBStreamsExecutor>;
explicit TBBStreamsExecutor(const Config& config = {});
~TBBStreamsExecutor() override;
void run(Task task) override;
void Execute(Task task) override;
int GetStreamId() override;
int GetNumaNodeId() override;
private:
struct Impl;
std::unique_ptr<Impl> _impl;
};
} // namespace InferenceEngine
#endif

View File

@ -1,122 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#pragma once
/**
* @brief A file containing thread local class implementation.
* @file ie_thread_local.hpp
*/
#include "ie_parallel.hpp"
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
# include <tbb/enumerable_thread_specific.h>
#else
# include <functional>
# include <memory>
# include <mutex>
# include <thread>
# include <unordered_map>
# include <utility>
#endif
namespace InferenceEngine {
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
/**
* @brief A wrapper class to keep object to be thread local.
* @ingroup ie_dev_api_threading
* @tparam T A type of object to keep thread local.
*/
template <typename T>
using ThreadLocal = tbb::enumerable_thread_specific<T>;
#else
template <typename T>
struct ThreadLocal {
using Map = std::unordered_map<std::thread::id, T>;
using Create = std::function<T()>;
Map _map;
mutable std::mutex _mutex;
Create _create;
ThreadLocal()
: _create{[] {
return T{};
}} {}
explicit ThreadLocal(const T& init)
: _create{[init] {
return init;
}} {}
ThreadLocal(ThreadLocal&& other) : _map{std::move(other._map)}, _create{std::move(other._create)} {}
ThreadLocal& operator=(ThreadLocal&& other) {
_map = std::move(other._map);
_create = std::move(other._create);
return *this;
}
ThreadLocal(const ThreadLocal&) = delete;
ThreadLocal& operator=(const ThreadLocal&&) = delete;
explicit ThreadLocal(const Create& create_) : _create{create_} {}
T& local() {
auto threadId = std::this_thread::get_id();
std::lock_guard<std::mutex> lock{_mutex};
auto itThreadLocal = _map.find(threadId);
if (itThreadLocal != _map.end()) {
return itThreadLocal->second;
} else {
return _map.emplace(threadId, _create()).first->second;
}
}
auto size() const -> decltype(_map.size()) {
std::lock_guard<std::mutex> lock{_mutex};
return _map.size();
}
// WARNING: Thread Unsafe
template <typename It>
struct Iterator {
It it;
bool operator!=(const Iterator& other) {
return it != other.it;
}
Iterator& operator++() {
++it;
return *this;
}
auto operator*() -> decltype(it->second) {
return it->second;
}
auto operator-> () -> decltype(&(it->second)) {
return &(it->second);
}
auto operator*() const -> decltype(it->second) {
return it->second;
}
auto operator-> () const -> decltype(&(it->second)) {
return &(it->second);
}
};
auto begin() -> Iterator<decltype(_map.begin())> {
return {_map.begin()};
}
auto end() -> Iterator<decltype(_map.end())> {
return {_map.end()};
}
auto begin() const -> Iterator<decltype(_map.begin())> const {
return {_map.begin()};
}
auto end() const -> Iterator<decltype(_map.end())> const {
return {_map.end()};
}
};
#endif
} // namespace InferenceEngine

View File

@ -1,141 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
///////////////////////////////////////////////////////////////////////////////////////////////////
#pragma once
#include <atomic>
#include <cstddef>
#include <mutex>
#include <queue>
#include <type_traits>
#include "ie_parallel.hpp"
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
# include <tbb/concurrent_priority_queue.h>
# include <tbb/concurrent_queue.h>
#endif
namespace InferenceEngine {
template <typename T>
class ThreadSafeQueueWithSize {
public:
void push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(std::move(value));
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (!_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.size();
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
};
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
template <typename T>
using ThreadSafeQueue = tbb::concurrent_queue<T>;
template <typename T>
using ThreadSafeBoundedQueue = tbb::concurrent_bounded_queue<T>;
template <typename T>
class ThreadSafeBoundedPriorityQueue {
public:
ThreadSafeBoundedPriorityQueue() = default;
bool try_push(T&& value) {
if (_capacity) {
_pqueue.push(std::move(value));
return true;
}
return false;
}
bool try_pop(T& value) {
return _capacity ? _pqueue.try_pop(value) : false;
}
void set_capacity(std::size_t newCapacity) {
_capacity = newCapacity;
}
protected:
tbb::concurrent_priority_queue<T, std::greater<T>> _pqueue;
std::atomic_bool _capacity{false};
};
#else
template <typename T>
using ThreadSafeQueue = ThreadSafeQueueWithSize<T>;
template <typename T>
class ThreadSafeBoundedQueue {
public:
ThreadSafeBoundedQueue() = default;
bool try_push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity) {
_queue.push(std::move(value));
}
return _capacity;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity && !_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
void set_capacity(std::size_t newCapacity) {
std::lock_guard<std::mutex> lock(_mutex);
_capacity = newCapacity;
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
bool _capacity = false;
};
template <typename T>
class ThreadSafeBoundedPriorityQueue {
public:
ThreadSafeBoundedPriorityQueue() = default;
bool try_push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity) {
_queue.push(std::move(value));
}
return _capacity;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity && !_queue.empty()) {
value = std::move(_queue.top());
_queue.pop();
return true;
} else {
return false;
}
}
void set_capacity(std::size_t newCapacity) {
std::lock_guard<std::mutex> lock(_mutex);
_capacity = newCapacity;
}
protected:
std::priority_queue<T, std::vector<T>, std::greater<T>> _queue;
std::mutex _mutex;
bool _capacity = false;
};
#endif
} // namespace InferenceEngine

View File

@ -37,7 +37,7 @@
#include "openvino/core/runtime_attribute.hpp" #include "openvino/core/runtime_attribute.hpp"
#include "openvino/op/util/op_types.hpp" #include "openvino/op/util/op_types.hpp"
#include "openvino/pass/manager.hpp" #include "openvino/pass/manager.hpp"
#include "threading/ie_executor_manager.hpp" #include "openvino/runtime/threading/executor_manager.hpp"
#include "transformations/utils/utils.hpp" #include "transformations/utils/utils.hpp"
namespace InferenceEngine { namespace InferenceEngine {
@ -83,7 +83,7 @@ OutputsDataMap copyInfo(const OutputsDataMap& networkOutputs) {
return _networkOutputs; return _networkOutputs;
} }
IInferencePlugin::IInferencePlugin() : _executorManager(InferenceEngine::executorManager()), _isNewAPI(true) {} IInferencePlugin::IInferencePlugin() : _executorManager(ov::threading::executor_manager()), _isNewAPI(true) {}
void IInferencePlugin::VersionStore::copyFrom(const Version& v) { void IInferencePlugin::VersionStore::copyFrom(const Version& v) {
description = v.description; description = v.description;
@ -270,7 +270,7 @@ bool IInferencePlugin::IsNewAPI() const noexcept {
return _isNewAPI; return _isNewAPI;
} }
const std::shared_ptr<ExecutorManager>& IInferencePlugin::executorManager() const { const std::shared_ptr<ov::threading::ExecutorManager>& IInferencePlugin::executorManager() const {
return _executorManager; return _executorManager;
} }

View File

@ -42,7 +42,6 @@
#include "openvino/runtime/threading/executor_manager.hpp" #include "openvino/runtime/threading/executor_manager.hpp"
#include "openvino/runtime/variable_state.hpp" #include "openvino/runtime/variable_state.hpp"
#include "remote_context_wrapper.hpp" #include "remote_context_wrapper.hpp"
#include "threading/ie_executor_manager.hpp"
#include "transformations/utils/utils.hpp" #include "transformations/utils/utils.hpp"
#ifdef PROXY_PLUGIN_ENABLED #ifdef PROXY_PLUGIN_ENABLED
@ -232,7 +231,7 @@ public:
version.description = ver.description; version.description = ver.description;
SetVersion(version); SetVersion(version);
_isNewAPI = plugin->is_new_api(); _isNewAPI = plugin->is_new_api();
_executorManager = InferenceEngine::create_old_manager(plugin->get_executor_manager()); _executorManager = plugin->get_executor_manager();
} }
virtual ~IInferencePluginWrapper() = default; virtual ~IInferencePluginWrapper() = default;

View File

@ -10,7 +10,7 @@
#include "dev/converter_utils.hpp" #include "dev/converter_utils.hpp"
#include "ie_icore.hpp" #include "ie_icore.hpp"
#include "openvino/runtime/iremote_context.hpp" #include "openvino/runtime/iremote_context.hpp"
#include "threading/ie_executor_manager.hpp" #include "openvino/runtime/threading/executor_manager.hpp"
namespace InferenceEngine { namespace InferenceEngine {
@ -22,7 +22,7 @@ IPluginWrapper::IPluginWrapper(const std::shared_ptr<InferenceEngine::IInference
m_plugin_name = m_old_plugin->GetName(); m_plugin_name = m_old_plugin->GetName();
m_is_new_api = m_old_plugin->IsNewAPI(); m_is_new_api = m_old_plugin->IsNewAPI();
m_core = m_old_plugin->GetCore(); m_core = m_old_plugin->GetCore();
m_executor_manager = m_old_plugin->executorManager()->get_ov_manager(); m_executor_manager = m_old_plugin->executorManager();
} }
const std::shared_ptr<InferenceEngine::IExecutableNetworkInternal>& IPluginWrapper::update_exec_network( const std::shared_ptr<InferenceEngine::IExecutableNetworkInternal>& IPluginWrapper::update_exec_network(

View File

@ -1,58 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "threading/ie_cpu_streams_executor.hpp"
#include <atomic>
#include <cassert>
#include <climits>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <openvino/itt.hpp>
#include <queue>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "ie_parallel_custom_arena.hpp"
#include "ie_system_conf.h"
#include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "threading/ie_executor_manager.hpp"
#include "threading/ie_istreams_executor.hpp"
#include "threading/ie_thread_affinity.hpp"
#include "threading/ie_thread_local.hpp"
namespace InferenceEngine {
struct CPUStreamsExecutor::Impl : public ov::threading::CPUStreamsExecutor {
Impl(const InferenceEngine::IStreamsExecutor::Config& config) : ov::threading::CPUStreamsExecutor(config) {}
};
int CPUStreamsExecutor::GetStreamId() {
return _impl->get_stream_id();
}
int CPUStreamsExecutor::GetNumaNodeId() {
return _impl->get_numa_node_id();
}
int CPUStreamsExecutor::GetSocketId() {
return _impl->get_socket_id();
}
CPUStreamsExecutor::CPUStreamsExecutor(const IStreamsExecutor::Config& config) : _impl{new Impl(config)} {}
CPUStreamsExecutor::~CPUStreamsExecutor() {}
void CPUStreamsExecutor::Execute(Task task) {
_impl->execute(std::move(task));
}
void CPUStreamsExecutor::run(Task task) {
_impl->run(std::move(task));
}
} // namespace InferenceEngine

View File

@ -1,162 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "threading/ie_executor_manager.hpp"
#include "ie_parallel.hpp"
#include "openvino/runtime/properties.hpp"
#include "openvino/runtime/threading/executor_manager.hpp"
#include "openvino/runtime/threading/istreams_executor.hpp"
#include "openvino/runtime/threading/itask_executor.hpp"
#include "threading/ie_cpu_streams_executor.hpp"
#include "threading/ie_itask_executor.hpp"
#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO
# if (TBB_INTERFACE_VERSION < 12000)
# include <tbb/task_scheduler_init.h>
# else
# include <oneapi/tbb/global_control.h>
# endif
#endif
#include <memory>
#include <mutex>
#include <string>
#include <utility>
namespace InferenceEngine {
namespace {
class ExecutorManagerImpl : public ExecutorManager {
public:
ExecutorManagerImpl(const std::shared_ptr<ov::threading::ExecutorManager>& manager);
ITaskExecutor::Ptr getExecutor(const std::string& id) override;
IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) override;
size_t getExecutorsNumber() const override;
size_t getIdleCPUStreamsExecutorsNumber() const override;
void clear(const std::string& id = {}) override;
void setTbbFlag(bool flag) override;
bool getTbbFlag() override;
private:
std::shared_ptr<ov::threading::ExecutorManager> m_manager;
std::shared_ptr<ov::threading::ExecutorManager> get_ov_manager() const override {
return m_manager;
}
};
class TaskExecutorWrapper : public ITaskExecutor {
std::shared_ptr<ov::threading::ITaskExecutor> m_executor;
public:
TaskExecutorWrapper(const std::shared_ptr<ov::threading::ITaskExecutor>& executor) : m_executor(executor) {}
void run(Task task) override {
m_executor->run(task);
}
void runAndWait(const std::vector<Task>& tasks) override {
m_executor->run_and_wait(tasks);
}
};
class StreamsExecutorWrapper : public IStreamsExecutor {
std::shared_ptr<ov::threading::IStreamsExecutor> m_executor;
public:
StreamsExecutorWrapper(const std::shared_ptr<ov::threading::IStreamsExecutor>& executor) : m_executor(executor) {}
void run(Task task) override {
m_executor->run(task);
}
void runAndWait(const std::vector<Task>& tasks) override {
m_executor->run_and_wait(tasks);
}
int GetStreamId() override {
return m_executor->get_stream_id();
}
int GetNumaNodeId() override {
return m_executor->get_numa_node_id();
}
int GetSocketId() override {
return m_executor->get_socket_id();
}
void Execute(Task task) override {
m_executor->execute(task);
}
};
} // namespace
ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr<ov::threading::ExecutorManager>& manager)
: m_manager(manager) {}
void ExecutorManagerImpl::setTbbFlag(bool flag) {
m_manager->set_property({{ov::force_tbb_terminate.name(), flag}});
}
bool ExecutorManagerImpl::getTbbFlag() {
return m_manager->get_property(ov::force_tbb_terminate.name()).as<bool>();
}
ITaskExecutor::Ptr ExecutorManagerImpl::getExecutor(const std::string& id) {
return std::make_shared<TaskExecutorWrapper>(m_manager->get_executor(id));
}
IStreamsExecutor::Ptr ExecutorManagerImpl::getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) {
return std::make_shared<StreamsExecutorWrapper>(m_manager->get_idle_cpu_streams_executor(config));
}
size_t ExecutorManagerImpl::getExecutorsNumber() const {
return m_manager->get_executors_number();
}
size_t ExecutorManagerImpl::getIdleCPUStreamsExecutorsNumber() const {
return m_manager->get_idle_cpu_streams_executors_number();
}
void ExecutorManagerImpl::clear(const std::string& id) {
return m_manager->clear(id);
}
std::shared_ptr<InferenceEngine::ExecutorManager> create_old_manager(
const std::shared_ptr<ov::threading::ExecutorManager>& manager) {
return std::make_shared<ExecutorManagerImpl>(manager);
}
namespace {
class ExecutorManagerHolder {
std::mutex _mutex;
std::weak_ptr<ExecutorManager> _manager;
public:
ExecutorManagerHolder(const ExecutorManagerHolder&) = delete;
ExecutorManagerHolder& operator=(const ExecutorManagerHolder&) = delete;
ExecutorManagerHolder() = default;
ExecutorManager::Ptr get() {
std::lock_guard<std::mutex> lock(_mutex);
auto manager = _manager.lock();
if (!manager) {
_manager = manager = create_old_manager(ov::threading::executor_manager());
}
return manager;
}
};
} // namespace
ExecutorManager::Ptr executorManager() {
static ExecutorManagerHolder executorManagerHolder;
return executorManagerHolder.get();
}
ExecutorManager* ExecutorManager::getInstance() {
static auto ptr = executorManager().get();
return ptr;
}
} // namespace InferenceEngine

View File

@ -1,57 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "threading/ie_istreams_executor.hpp"
#include <algorithm>
#include <openvino/util/log.hpp>
#include <string>
#include <thread>
#include <vector>
#include "cpp_interfaces/interface/ie_internal_plugin_config.hpp"
#include "ie_parallel.hpp"
#include "ie_parallel_custom_arena.hpp"
#include "ie_parameter.hpp"
#include "ie_plugin_config.hpp"
#include "ie_system_conf.h"
#include "openvino/runtime/properties.hpp"
#include "openvino/util/common_util.hpp"
namespace InferenceEngine {
IStreamsExecutor::~IStreamsExecutor() {}
std::vector<std::string> IStreamsExecutor::Config::SupportedKeys() const {
return get_property(ov::supported_properties.name()).as<std::vector<std::string>>();
}
int IStreamsExecutor::Config::GetDefaultNumStreams(const bool enable_hyper_thread) {
return get_default_num_streams(enable_hyper_thread);
}
int IStreamsExecutor::Config::GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode) {
return get_hybrid_num_streams(config, stream_mode);
}
void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::string& value) {
set_property(key, value);
}
Parameter IStreamsExecutor::Config::GetConfig(const std::string& key) const {
return get_property(key);
}
void IStreamsExecutor::Config::UpdateHybridCustomThreads(Config& config) {
return update_hybrid_custom_threads(config);
}
IStreamsExecutor::Config IStreamsExecutor::Config::MakeDefaultMultiThreaded(const IStreamsExecutor::Config& initial,
const bool fp_intesive) {
return make_default_multi_threaded(initial);
}
IStreamsExecutor::Config IStreamsExecutor::Config::ReserveCpuThreads(const Config& initial) {
return reserve_cpu_threads(initial);
}
} // namespace InferenceEngine

View File

@ -1,18 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "threading/ie_itask_executor.hpp"
#include <future>
#include <memory>
#include <utility>
#include <vector>
namespace InferenceEngine {
void ITaskExecutor::runAndWait(const std::vector<Task>& tasks) {
run_and_wait(tasks);
}
} // namespace InferenceEngine

View File

@ -1,13 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @brief Contains declarations and custom threading interfaces based on TBB info and task_arena APIs.
*
* @file ie_parallel_custom.hpp
*/
#pragma once
#include "dev/threading/parallel_custom_arena.hpp"

View File

@ -1,304 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "threading/ie_tbb_streams_executor.hpp"
#include <atomic>
#include <list>
#include <memory>
#include <queue>
#include <thread>
#include <tuple>
#include <utility>
#include "details/ie_exception.hpp"
#include "ie_parallel.hpp"
#include "ie_parallel_custom_arena.hpp"
#include "ie_system_conf.h"
#include "threading/ie_thread_affinity.hpp"
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
# include <tbb/concurrent_queue.h>
# include <tbb/enumerable_thread_specific.h>
# ifndef TBB_PREVIEW_GLOBAL_CONTROL
# define TBB_PREVIEW_GLOBAL_CONTROL 1
# endif
# include <tbb/global_control.h>
# include <tbb/task_group.h>
# include <tbb/task_scheduler_observer.h>
namespace InferenceEngine {
struct TBBStreamsExecutor::Impl {
struct Stream;
using TaskQueue = tbb::concurrent_queue<Task>;
using StreamQueue = tbb::concurrent_bounded_queue<Stream*>;
using LocalStreams = tbb::enumerable_thread_specific<Stream*>;
struct Shared : public std::enable_shared_from_this<Shared> {
using Ptr = std::shared_ptr<Shared>;
TaskQueue _taskQueue;
StreamQueue _streamQueue;
};
struct Stream {
struct Observer : tbb::task_scheduler_observer {
Stream* _thisStream = nullptr;
LocalStreams* _localStream = nullptr;
CpuSet _mask;
int _ncpus = 0;
int _threadBindingStep = 0;
int _offset = 0;
Observer(custom::task_arena& arena,
Stream* thisStream,
LocalStreams* localStream,
const bool pinToCores,
const int streamId,
const int threadsPerStream,
const int threadBindingStep,
const int threadBindingOffset)
: tbb::task_scheduler_observer{static_cast<tbb::task_arena&>(arena)},
_thisStream{thisStream},
_localStream{localStream},
_threadBindingStep{threadBindingStep},
_offset{streamId * threadsPerStream + threadBindingOffset} {
if (pinToCores) {
std::tie(_mask, _ncpus) = GetProcessMask();
}
}
void on_scheduler_entry(bool) override {
_localStream->local() = _thisStream;
if (nullptr != _mask) {
PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(),
_threadBindingStep,
_ncpus,
_mask);
}
}
void on_scheduler_exit(bool) override {
_localStream->local() = nullptr;
if (nullptr != _mask) {
PinCurrentThreadByMask(_ncpus, _mask);
}
}
~Observer() override = default;
};
explicit Stream(Impl* impl, const bool externStream = false) : _impl{impl} {
{
std::lock_guard<std::mutex> lock{_impl->_streamIdMutex};
if (_impl->_streamIdQueue.empty()) {
_streamId = _impl->_streamId++;
} else {
_streamId = _impl->_streamIdQueue.front();
_impl->_streamIdQueue.pop();
}
}
_numaNodeId = _impl->_config._streams
? _impl->_usedNumaNodes.at((_streamId % _impl->_config._streams) /
((_impl->_config._streams + _impl->_usedNumaNodes.size() - 1) /
_impl->_usedNumaNodes.size()))
: _impl->_usedNumaNodes.at(_streamId % _impl->_usedNumaNodes.size());
auto concurrency =
(0 == _impl->_config._threadsPerStream) ? tbb::task_arena::automatic : _impl->_config._threadsPerStream;
auto masterThreads = externStream ? 1u : 0u;
if (ThreadBindingType::HYBRID_AWARE == _impl->_config._threadBindingType) {
if (Config::PreferredCoreType::ROUND_ROBIN != _impl->_config._threadPreferredCoreType) {
if (Config::PreferredCoreType::ANY == _impl->_config._threadPreferredCoreType) {
_arena.initialize(concurrency);
} else {
const auto selected_core_type =
Config::PreferredCoreType::BIG == _impl->_config._threadPreferredCoreType
? custom::info::core_types().back() // running on Big cores only
: custom::info::core_types().front(); // running on Little cores only
_arena.initialize(custom::task_arena::constraints{}
.set_core_type(selected_core_type)
.set_max_concurrency(concurrency));
}
} else {
// assigning the stream to the core type in the round-robin fashion
// wrapping around total_streams (i.e. how many streams all different core types can handle
// together)
const auto total_streams = _impl->_totalSreamsOnCoreTypes.back().second;
const auto streamId_wrapped = _streamId % total_streams;
const auto& selected_core_type =
std::find_if(_impl->_totalSreamsOnCoreTypes.cbegin(),
_impl->_totalSreamsOnCoreTypes.cend(),
[streamId_wrapped](const decltype(_impl->_totalSreamsOnCoreTypes)::value_type& p) {
return p.second > streamId_wrapped;
})
->first;
_arena.initialize(custom::task_arena::constraints{}
.set_core_type(selected_core_type)
.set_max_concurrency(concurrency));
}
} else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
_arena.initialize(custom::task_arena::constraints{_numaNodeId, concurrency});
} else {
_arena.initialize(concurrency, masterThreads);
}
_observer.reset(new Observer{_arena,
this,
&(_impl->_localStream),
(ThreadBindingType::CORES == _impl->_config._threadBindingType),
_streamId,
_impl->_config._threadsPerStream,
_impl->_config._threadBindingStep,
_impl->_config._threadBindingOffset});
_observer->observe(true);
}
~Stream() {
static_cast<tbb::task_arena&>(_arena).terminate();
_observer->observe(false);
{
std::lock_guard<std::mutex> lock{_impl->_streamIdMutex};
_impl->_streamIdQueue.push(_streamId);
}
}
Impl* _impl = nullptr;
int _streamId = 0;
int _numaNodeId = 0;
custom::task_arena _arena;
std::unique_ptr<Observer> _observer;
};
using Streams = std::list<Stream>;
using ExternStreams = tbb::enumerable_thread_specific<Stream>;
explicit Impl(const Config& config)
: _config{config},
_shared{std::make_shared<Shared>()},
_localStream{nullptr},
_externStreams{this, true} {
if (_config._streams * _config._threadsPerStream >= static_cast<int>(std::thread::hardware_concurrency())) {
_maxTbbThreads.reset(
new tbb::global_control{tbb::global_control::max_allowed_parallelism,
static_cast<std::size_t>(_config._streams * _config._threadsPerStream + 1)});
}
auto numaNodes = getAvailableNUMANodes();
if (_config._streams != 0) {
std::copy_n(std::begin(numaNodes),
std::min(static_cast<std::size_t>(_config._streams), numaNodes.size()),
std::back_inserter(_usedNumaNodes));
} else {
_usedNumaNodes = numaNodes;
}
if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) {
const auto core_types = custom::info::core_types();
const int threadsPerStream =
(0 == config._threadsPerStream) ? std::thread::hardware_concurrency() : config._threadsPerStream;
int sum = 0;
// reversed order, so BIG cores are first
for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) {
const auto& type = *iter;
// calculating the #streams per core type
const int num_streams_for_core_type =
std::max(1,
custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(type)) /
threadsPerStream);
sum += num_streams_for_core_type;
// prefix sum, so the core type for a given stream id will be deduced just as a upper_bound
// (notice that the map keeps the elements in the descending order, so the big cores are populated
// first)
_totalSreamsOnCoreTypes.emplace_back(type, sum);
}
}
_shared->_streamQueue.set_capacity(_config._streams);
for (int streamId = 0; streamId < _config._streams; ++streamId) {
_streams.emplace_back(this);
_shared->_streamQueue.push(&(_streams.back()));
}
}
~Impl() {
for (int streamId = 0; streamId < _config._streams; ++streamId) {
Stream* stream = nullptr;
_shared->_streamQueue.pop(stream);
(void)stream;
}
}
static void Schedule(Shared::Ptr& shared, Task task) {
Stream* stream = nullptr;
if (shared->_streamQueue.try_pop(stream)) {
struct TryPop {
void operator()() const {
try {
do {
Task task = std::move(_task);
task();
} while (_shared->_taskQueue.try_pop(_task));
} catch (...) {
}
if (_shared->_streamQueue.try_push(_stream)) {
if (_shared->_taskQueue.try_pop(_task)) {
Schedule(_shared, std::move(_task));
}
}
}
Stream* _stream;
mutable Shared::Ptr _shared;
mutable Task _task;
};
stream->_arena.enqueue(TryPop{stream, shared->shared_from_this(), std::move(task)});
} else {
shared->_taskQueue.push(std::move(task));
}
}
Config _config;
std::unique_ptr<tbb::global_control> _maxTbbThreads;
std::mutex _streamIdMutex;
int _streamId = 0;
std::queue<int> _streamIdQueue;
std::vector<int> _usedNumaNodes;
Shared::Ptr _shared;
LocalStreams _localStream;
ExternStreams _externStreams;
Streams _streams;
using StreamIdToCoreTypes = std::vector<std::pair<custom::core_type_id, int>>;
StreamIdToCoreTypes _totalSreamsOnCoreTypes;
};
TBBStreamsExecutor::TBBStreamsExecutor(const Config& config) : _impl{new TBBStreamsExecutor::Impl{config}} {}
TBBStreamsExecutor::~TBBStreamsExecutor() {
_impl.reset();
}
int TBBStreamsExecutor::GetStreamId() {
auto stream = _impl->_localStream.local();
if (nullptr == stream) {
stream = &(_impl->_externStreams.local());
}
return stream->_streamId;
}
int TBBStreamsExecutor::GetNumaNodeId() {
auto stream = _impl->_localStream.local();
if (nullptr == stream) {
stream = &(_impl->_externStreams.local());
}
return stream->_numaNodeId;
}
void TBBStreamsExecutor::run(Task task) {
if (_impl->_config._streams == 0) {
Execute(std::move(task));
} else {
Impl::Schedule(_impl->_shared, std::move(task));
}
}
void TBBStreamsExecutor::Execute(Task task) {
auto stream = _impl->_localStream.local();
if (nullptr == stream) {
_impl->_externStreams.local()._arena.execute(std::move(task));
} else {
stream->_arena.execute(std::move(task));
}
}
} // namespace InferenceEngine
#endif // ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))

View File

@ -1,34 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include "threading/ie_thread_affinity.hpp"
#include "dev/threading/thread_affinity.hpp"
namespace InferenceEngine {
std::tuple<CpuSet, int> GetProcessMask() {
return ov::threading::get_process_mask();
}
void ReleaseProcessMask(cpu_set_t* mask) {
ov::threading::release_process_mask(mask);
}
bool PinThreadToVacantCore(int thrIdx,
int hyperthreads,
int ncores,
const CpuSet& procMask,
const std::vector<int>& cpu_ids,
int cpuIdxOffset) {
return ov::threading::pin_thread_to_vacant_core(thrIdx, hyperthreads, ncores, procMask, cpu_ids, cpuIdxOffset);
}
bool PinCurrentThreadByMask(int ncores, const CpuSet& procMask) {
return ov::threading::pin_current_thread_by_mask(ncores, procMask);
}
bool PinCurrentThreadToSocket(int socket) {
return ov::threading::pin_current_thread_to_socket(socket);
}
} // namespace InferenceEngine

View File

@ -1,77 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#pragma once
#include <memory>
#include <tuple>
#include <vector>
#include "dev/threading/thread_affinity.hpp"
#if !(defined(__APPLE__) || defined(__EMSCRIPTEN__) || defined(_WIN32))
# include <sched.h>
#endif
namespace InferenceEngine {
#if (defined(__APPLE__) || defined(__EMSCRIPTEN__) || defined(_WIN32))
using cpu_set_t = ov::threading::cpu_set_t;
#endif // (defined(__APPLE__) || defined(__EMSCRIPTEN__) || defined(_WIN32))
/**
* @brief Release the cores affinity mask for the current process
* @ingroup ie_dev_api_threading
*
* @param mask The mask
*/
void ReleaseProcessMask(cpu_set_t* mask);
using ReleaseProcessMaskDeleter = ov::threading::ReleaseProcessMaskDeleter;
using CpuSet = ov::threading::CpuSet;
/**
* @brief Get the cores affinity mask for the current process
* @ingroup ie_dev_api_threading
* @return A core affinity mask
*/
std::tuple<CpuSet, int> GetProcessMask();
/**
* @brief Pins current thread to a set of cores determined by the mask
* @ingroup ie_dev_api_threading
*
* @param[in] thrIdx The thr index
* @param[in] hyperThreads The hyper threads
* @param[in] ncores The ncores
* @param[in] processMask The process mask
* @return `True` in case of success, `false` otherwise
*/
bool PinThreadToVacantCore(int thrIdx,
int hyperThreads,
int ncores,
const CpuSet& processMask,
const std::vector<int>& cpu_ids = {},
int cpuIdxOffset = 0);
/**
* @brief Pins thread to a spare core in the round-robin scheme, while respecting the given process mask.
* The function can also handle the hyper-threading (by populating the physical cores first)
* @ingroup ie_dev_api_threading
*
* @param[in] ncores The ncores
* @param[in] processMask The process mask
* @return `True` in case of success, `false` otherwise
*/
bool PinCurrentThreadByMask(int ncores, const CpuSet& processMask);
/**
* @brief Pins a current thread to a socket.
* @ingroup ie_dev_api_threading
*
* @param[in] socket The socket id
* @return `True` in case of success, `false` otherwise
*/
bool PinCurrentThreadToSocket(int socket);
} // namespace InferenceEngine

View File

@ -1,12 +0,0 @@
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
/**
* @brief Defines openvino tbbbind domains for tracing
* @file itt.hpp
*/
#pragma once
#include "dev/threading/itt.hpp"

View File

@ -9,16 +9,15 @@
#include <cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp> #include <cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp>
#include <deque> #include <deque>
#include <inference_engine.hpp> #include <inference_engine.hpp>
#include <threading/ie_cpu_streams_executor.hpp>
#include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp" #include "unit_test_utils/mocks/cpp_interfaces/impl/mock_async_infer_request_default.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/interface/mock_iinfer_request_internal.hpp" #include "unit_test_utils/mocks/cpp_interfaces/interface/mock_iinfer_request_internal.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp" #include "unit_test_utils/mocks/cpp_interfaces/mock_task_executor.hpp"
using namespace ::testing; using namespace ::testing;
using namespace std; using namespace std;
using namespace InferenceEngine; using namespace ov::threading;
using namespace InferenceEngine::details;
struct DeferedExecutor : public ITaskExecutor { struct DeferedExecutor : public ITaskExecutor {
using Ptr = std::shared_ptr<DeferedExecutor>; using Ptr = std::shared_ptr<DeferedExecutor>;
@ -183,7 +182,7 @@ TEST_F(InferRequestThreadSafeDefaultTests, callbackIsCalledIfAsyncRequestFailed)
} }
TEST_F(InferRequestThreadSafeDefaultTests, canCatchExceptionIfAsyncRequestFailedAndNoCallback) { TEST_F(InferRequestThreadSafeDefaultTests, canCatchExceptionIfAsyncRequestFailedAndNoCallback) {
auto taskExecutor = std::make_shared<CPUStreamsExecutor>(); auto taskExecutor = std::make_shared<CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{});
testRequest = make_shared<AsyncInferRequestThreadSafeDefault>(mockInferRequestInternal, taskExecutor, taskExecutor); testRequest = make_shared<AsyncInferRequestThreadSafeDefault>(mockInferRequestInternal, taskExecutor, taskExecutor);
EXPECT_CALL(*mockInferRequestInternal.get(), InferImpl()).WillOnce(Throw(std::exception())); EXPECT_CALL(*mockInferRequestInternal.get(), InferImpl()).WillOnce(Throw(std::exception()));
testRequest->StartAsync(); testRequest->StartAsync();

View File

@ -15,11 +15,6 @@
#include "serialize.h" #include "serialize.h"
#include "openvino/runtime/threading/executor_manager.hpp" #include "openvino/runtime/threading/executor_manager.hpp"
#include "transformations/transformation_pipeline.h" #include "transformations/transformation_pipeline.h"
#define FIX_62820 0
#if FIX_62820 && ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
# include <threading/ie_tbb_streams_executor.hpp>
#endif
#include "openvino/runtime/properties.hpp" #include "openvino/runtime/properties.hpp"
#include "openvino/util/common_util.hpp" #include "openvino/util/common_util.hpp"
#include "openvino/runtime/threading/cpu_streams_executor.hpp" #include "openvino/runtime/threading/cpu_streams_executor.hpp"
@ -71,20 +66,11 @@ CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
: IStreamsExecutor::Config::make_default_multi_threaded(m_cfg.streamExecutorConfig, isFloatModel); : IStreamsExecutor::Config::make_default_multi_threaded(m_cfg.streamExecutorConfig, isFloatModel);
streamsExecutorConfig._name = "CPUStreamsExecutor"; streamsExecutorConfig._name = "CPUStreamsExecutor";
m_cfg.streamExecutorConfig._threads = streamsExecutorConfig._threads; m_cfg.streamExecutorConfig._threads = streamsExecutorConfig._threads;
#if FIX_62820 && (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
m_task_executor = std::make_shared<TBBStreamsExecutor>(streamsExecutorConfig);
#else
m_task_executor = m_plugin->get_executor_manager()->get_idle_cpu_streams_executor(streamsExecutorConfig); m_task_executor = m_plugin->get_executor_manager()->get_idle_cpu_streams_executor(streamsExecutorConfig);
#endif
} }
if (0 != cfg.streamExecutorConfig._streams) { if (0 != cfg.streamExecutorConfig._streams) {
#if FIX_62820 && (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
// There is no additional threads but we still need serialize callback execution to preserve legacy behaviour
m_callback_executor = std::make_shared<ImmediateSerialExecutor>();
#else
m_callback_executor = m_plugin->get_executor_manager()->get_idle_cpu_streams_executor( m_callback_executor = m_plugin->get_executor_manager()->get_idle_cpu_streams_executor(
IStreamsExecutor::Config{"CPUCallbackExecutor", 1, 0, IStreamsExecutor::ThreadBindingType::NONE}); IStreamsExecutor::Config{"CPUCallbackExecutor", 1, 0, IStreamsExecutor::ThreadBindingType::NONE});
#endif
} else { } else {
m_callback_executor = m_task_executor; m_callback_executor = m_task_executor;
} }

View File

@ -11,6 +11,7 @@
#include "cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp" #include "cpp_interfaces/impl/ie_infer_async_request_thread_safe_default.hpp"
using namespace ov::threading;
using namespace InferenceEngine; using namespace InferenceEngine;
class MockAsyncInferRequestDefault : public AsyncInferRequestThreadSafeDefault { class MockAsyncInferRequestDefault : public AsyncInferRequestThreadSafeDefault {

View File

@ -7,11 +7,12 @@
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <memory> #include <memory>
#include <threading/ie_itask_executor.hpp>
class MockTaskExecutor : public InferenceEngine::ITaskExecutor { #include "openvino/runtime/threading/itask_executor.hpp"
class MockTaskExecutor : public ov::threading::ITaskExecutor {
public: public:
typedef std::shared_ptr<MockTaskExecutor> Ptr; typedef std::shared_ptr<MockTaskExecutor> Ptr;
MOCK_METHOD1(run, void(InferenceEngine::Task)); MOCK_METHOD1(run, void(ov::threading::Task));
}; };