From 27ea9eab329cd7db547f5aeb24bfca7e549ef489 Mon Sep 17 00:00:00 2001 From: Ilya Churaev Date: Wed, 22 Feb 2023 20:19:35 +0400 Subject: [PATCH] Moved executor manager to new API (#15871) * Moved executor manager to new API * Fixed template plugin build * Remove setTBBFlag API * Fixed export * Added new files * Revert "Added new files" This reverts commit 981c3c863f7cbaa7d8180e11fb7f83e89c7cd4fd. * Fixed template plugin tests * Remove redundant wrapper * Remove wrappers for executor manager * Fixed build --- src/inference/CMakeLists.txt | 1 + .../dev_api/openvino/runtime/iplugin.hpp | 14 +- .../runtime/threading/executor_manager.hpp | 77 +++++++ .../dev_api/threading/ie_executor_manager.hpp | 15 ++ src/inference/src/dev/converter_utils.cpp | 4 +- src/inference/src/dev/core_impl.cpp | 7 +- src/inference/src/dev/core_impl.hpp | 4 +- src/inference/src/dev/iplugin.cpp | 4 +- src/inference/src/dev/iplugin_wrapper.cpp | 3 +- .../src/dev/threading/executor_manager.cpp | 208 ++++++++++++++++++ .../src/threading/ie_executor_manager.cpp | 127 ++--------- src/plugins/template/src/plugin.cpp | 6 +- 12 files changed, 345 insertions(+), 125 deletions(-) create mode 100644 src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp create mode 100644 src/inference/src/dev/threading/executor_manager.cpp diff --git a/src/inference/CMakeLists.txt b/src/inference/CMakeLists.txt index 03ded660879..f3f436e57d5 100644 --- a/src/inference/CMakeLists.txt +++ b/src/inference/CMakeLists.txt @@ -14,6 +14,7 @@ file (GLOB LIBRARY_SRC ${CMAKE_CURRENT_SOURCE_DIR}/src/cpp/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/dev/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/dev/preprocessing/*.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/dev/threading/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/threading/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/cpp/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/cpp_interfaces/interface/*.cpp diff --git a/src/inference/dev_api/openvino/runtime/iplugin.hpp b/src/inference/dev_api/openvino/runtime/iplugin.hpp index 47f576b46bf..54660161b2e 100644 --- a/src/inference/dev_api/openvino/runtime/iplugin.hpp +++ b/src/inference/dev_api/openvino/runtime/iplugin.hpp @@ -19,7 +19,7 @@ #include "openvino/runtime/icompiled_model.hpp" #include "openvino/runtime/icore.hpp" #include "openvino/runtime/remote_context.hpp" -#include "threading/ie_executor_manager.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" namespace InferenceEngine { @@ -188,7 +188,7 @@ public: * @brief Gets reference to tasks execution manager * @return Reference to ExecutorManager interface */ - const std::shared_ptr& get_executor_manager() const; + const std::shared_ptr& get_executor_manager() const; ~IPlugin() = default; @@ -198,11 +198,11 @@ protected: private: friend ::InferenceEngine::IPluginWrapper; - std::string m_plugin_name; //!< A device name that plugins enables - std::weak_ptr m_core; //!< A pointer to ICore interface - std::shared_ptr m_executor_manager; //!< A tasks execution manager - ov::Version m_version; //!< Member contains plugin version - bool m_is_new_api; //!< A flag which shows used API + std::string m_plugin_name; //!< A device name that plugins enables + std::weak_ptr m_core; //!< A pointer to ICore interface + std::shared_ptr m_executor_manager; //!< A tasks execution manager + ov::Version m_version; //!< Member contains plugin version + bool m_is_new_api; //!< A flag which shows used API }; } // namespace ov diff --git a/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp b/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp new file mode 100644 index 00000000000..d242d97c407 --- /dev/null +++ b/src/inference/dev_api/openvino/runtime/threading/executor_manager.hpp @@ -0,0 +1,77 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +/** + * @brief OpenVINO Runtime Executor Manager + * @file openvino/runtime/threading/executor_manager.hpp + */ + +#pragma once + +#include "openvino/runtime/common.hpp" +#include "threading/ie_istreams_executor.hpp" +#include "threading/ie_itask_executor.hpp" + +namespace ov { + +/** + * @interface ExecutorManager + * @brief Interface for tasks execution manager. + * This is global point for getting task executor objects by string id. + * It's necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. + * E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads + * to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor. + * @ingroup ov_dev_api_threading + */ +class OPENVINO_RUNTIME_API ExecutorManager { +public: + /** + * @brief Returns executor by unique identificator + * @param id An unique identificator of device (Usually string representation of TargetDevice) + * @return A shared pointer to existing or newly ITaskExecutor + */ + virtual InferenceEngine::ITaskExecutor::Ptr get_executor(const std::string& id) = 0; + + /** + * @brief Returns idle cpu streams executor + * + * @param config Streams executor config + * + * @return pointer to streams executor config + */ + virtual InferenceEngine::IStreamsExecutor::Ptr get_idle_cpu_streams_executor( + const InferenceEngine::IStreamsExecutor::Config& config) = 0; + + /** + * @brief Allows to configure executor manager + * + * @param properties map with configuration + */ + virtual void set_property(const ov::AnyMap& properties) = 0; + /** + * @brief Returns configuration + * + * @param name property name + * + * @return Property value + */ + virtual ov::Any get_property(const std::string& name) const = 0; + + /** + * @cond + */ + virtual size_t get_executors_number() const = 0; + + virtual size_t get_idle_cpu_streams_executors_number() const = 0; + + virtual void clear(const std::string& id = {}) = 0; + /** + * @endcond + */ + virtual ~ExecutorManager() = default; +}; + +OPENVINO_API std::shared_ptr executor_manager(); + +} // namespace ov diff --git a/src/inference/dev_api/threading/ie_executor_manager.hpp b/src/inference/dev_api/threading/ie_executor_manager.hpp index 2504884d071..f746c17f815 100644 --- a/src/inference/dev_api/threading/ie_executor_manager.hpp +++ b/src/inference/dev_api/threading/ie_executor_manager.hpp @@ -18,8 +18,16 @@ #include "threading/ie_istreams_executor.hpp" #include "threading/ie_itask_executor.hpp" +namespace ov { + +class ExecutorManager; + +} + namespace InferenceEngine { +class IPluginWrapper; + /** * @interface ExecutorManager * @brief Interface for tasks execution manager. @@ -76,8 +84,15 @@ public: */ virtual void setTbbFlag(bool flag) = 0; virtual bool getTbbFlag() = 0; + +private: + virtual std::shared_ptr get_ov_manager() const = 0; + friend class IPluginWrapper; }; INFERENCE_ENGINE_API_CPP(ExecutorManager::Ptr) executorManager(); +std::shared_ptr create_old_manager( + const std::shared_ptr& manager); + } // namespace InferenceEngine diff --git a/src/inference/src/dev/converter_utils.cpp b/src/inference/src/dev/converter_utils.cpp index 88bded83881..8e56463094a 100644 --- a/src/inference/src/dev/converter_utils.cpp +++ b/src/inference/src/dev/converter_utils.cpp @@ -34,8 +34,10 @@ #include "openvino/runtime/profiling_info.hpp" #include "openvino/runtime/remote_context.hpp" #include "openvino/runtime/tensor.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" #include "openvino/runtime/variable_state.hpp" #include "so_ptr.hpp" +#include "threading/ie_executor_manager.hpp" #include "transformations/utils/utils.hpp" namespace { @@ -221,7 +223,7 @@ public: version.description = ver.description; SetVersion(version); _isNewAPI = plugin->is_new_api(); - _executorManager = plugin->get_executor_manager(); + _executorManager = InferenceEngine::create_old_manager(plugin->get_executor_manager()); } std::string GetName() const noexcept override { return m_plugin->get_device_name(); diff --git a/src/inference/src/dev/core_impl.cpp b/src/inference/src/dev/core_impl.cpp index f31b3df76ff..3a07156b302 100644 --- a/src/inference/src/dev/core_impl.cpp +++ b/src/inference/src/dev/core_impl.cpp @@ -28,6 +28,7 @@ #include "openvino/pass/manager.hpp" #include "openvino/runtime/icompiled_model.hpp" #include "openvino/runtime/remote_context.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" #include "openvino/util/common_util.hpp" #include "openvino/util/shared_object.hpp" #include "preprocessing/preprocessing.hpp" @@ -57,7 +58,7 @@ void stripDeviceName(std::string& device, const std::string& substr) { ov::CoreImpl::CoreImpl(bool _newAPI) : m_new_api(_newAPI) { add_mutex(""); // Register global mutex - executorManagerPtr = InferenceEngine::executorManager(); + m_executor_manager = ov::executor_manager(); for (const auto& it : ov::get_available_opsets()) { opsetNames.insert(it.first); } @@ -632,7 +633,7 @@ void ov::CoreImpl::set_property(const std::string& device_name, const AnyMap& pr ov::Any ov::CoreImpl::get_property_for_core(const std::string& name) const { if (name == ov::force_tbb_terminate.name()) { - const auto flag = InferenceEngine::executorManager()->getTbbFlag(); + const auto flag = ov::executor_manager()->get_property(name).as(); return decltype(ov::force_tbb_terminate)::value_type(flag); } else if (name == ov::cache_dir.name()) { return ov::Any(coreConfig.get_cache_dir()); @@ -993,7 +994,7 @@ void ov::CoreImpl::CoreConfig::set_and_update(ov::AnyMap& config) { it = config.find(ov::force_tbb_terminate.name()); if (it != config.end()) { auto flag = it->second.as() == CONFIG_VALUE(YES) ? true : false; - InferenceEngine::executorManager()->setTbbFlag(flag); + ov::executor_manager()->set_property({{it->first, flag}}); config.erase(it); } diff --git a/src/inference/src/dev/core_impl.hpp b/src/inference/src/dev/core_impl.hpp index 0d74145f2ae..d4ffacde19b 100644 --- a/src/inference/src/dev/core_impl.hpp +++ b/src/inference/src/dev/core_impl.hpp @@ -21,7 +21,7 @@ #include "openvino/core/version.hpp" #include "openvino/runtime/common.hpp" #include "openvino/runtime/icompiled_model.hpp" -#include "threading/ie_executor_manager.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" #ifdef OPENVINO_STATIC_LIBRARY # include "ie_plugins.hpp" @@ -162,7 +162,7 @@ private: } }; - InferenceEngine::ExecutorManager::Ptr executorManagerPtr; + std::shared_ptr m_executor_manager; mutable std::unordered_set opsetNames; // TODO: make extensions to be optional with conditional compilation mutable std::vector extensions; diff --git a/src/inference/src/dev/iplugin.cpp b/src/inference/src/dev/iplugin.cpp index 73476d21386..ad8248ba928 100644 --- a/src/inference/src/dev/iplugin.cpp +++ b/src/inference/src/dev/iplugin.cpp @@ -4,7 +4,7 @@ #include "openvino/runtime/iplugin.hpp" -ov::IPlugin::IPlugin() : m_executor_manager(InferenceEngine::executorManager()), m_is_new_api(true) {} +ov::IPlugin::IPlugin() : m_executor_manager(ov::executor_manager()), m_is_new_api(true) {} void ov::IPlugin::set_version(const ov::Version& version) { m_version = version; @@ -42,7 +42,7 @@ bool ov::IPlugin::is_new_api() const { return m_is_new_api; } -const std::shared_ptr& ov::IPlugin::get_executor_manager() const { +const std::shared_ptr& ov::IPlugin::get_executor_manager() const { return m_executor_manager; } diff --git a/src/inference/src/dev/iplugin_wrapper.cpp b/src/inference/src/dev/iplugin_wrapper.cpp index 36207adf485..972d4d62bb4 100644 --- a/src/inference/src/dev/iplugin_wrapper.cpp +++ b/src/inference/src/dev/iplugin_wrapper.cpp @@ -9,6 +9,7 @@ #include "any_copy.hpp" #include "dev/converter_utils.hpp" #include "ie_icore.hpp" +#include "threading/ie_executor_manager.hpp" namespace InferenceEngine { @@ -20,7 +21,7 @@ IPluginWrapper::IPluginWrapper(const std::shared_ptrGetName(); m_is_new_api = m_old_plugin->IsNewAPI(); m_core = m_old_plugin->GetCore(); - m_executor_manager = m_old_plugin->executorManager(); + m_executor_manager = m_old_plugin->executorManager()->get_ov_manager(); } const std::shared_ptr& IPluginWrapper::update_exec_network( diff --git a/src/inference/src/dev/threading/executor_manager.cpp b/src/inference/src/dev/threading/executor_manager.cpp new file mode 100644 index 00000000000..11fb7a289dd --- /dev/null +++ b/src/inference/src/dev/threading/executor_manager.cpp @@ -0,0 +1,208 @@ +// Copyright (C) 2018-2023 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +#include "openvino/runtime/threading/executor_manager.hpp" + +#include "openvino/core/parallel.hpp" +#include "openvino/runtime/properties.hpp" +#include "threading/ie_cpu_streams_executor.hpp" +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO +# if (TBB_INTERFACE_VERSION < 12000) +# include +# else +# include +# endif +#endif + +#include +#include +#include +#include + +namespace ov { +namespace { +class ExecutorManagerImpl : public ExecutorManager { +public: + ~ExecutorManagerImpl(); + InferenceEngine::ITaskExecutor::Ptr get_executor(const std::string& id) override; + InferenceEngine::IStreamsExecutor::Ptr get_idle_cpu_streams_executor( + const InferenceEngine::IStreamsExecutor::Config& config) override; + size_t get_executors_number() const override; + size_t get_idle_cpu_streams_executors_number() const override; + void clear(const std::string& id = {}) override; + void set_property(const ov::AnyMap& properties) override; + ov::Any get_property(const std::string& name) const override; + +private: + void reset_tbb(); + + std::unordered_map executors; + std::vector> + cpuStreamsExecutors; + mutable std::mutex streamExecutorMutex; + mutable std::mutex taskExecutorMutex; + bool tbbTerminateFlag = false; + mutable std::mutex global_mutex; + bool tbbThreadsCreated = false; +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO +# if (TBB_INTERFACE_VERSION < 12000) + std::shared_ptr tbbTaskScheduler = nullptr; +# else + std::shared_ptr tbbTaskScheduler = nullptr; +# endif +#endif +}; + +} // namespace + +ExecutorManagerImpl::~ExecutorManagerImpl() { + reset_tbb(); +} + +void ExecutorManagerImpl::set_property(const ov::AnyMap& properties) { + std::lock_guard guard(global_mutex); + for (const auto& it : properties) { + if (it.first == ov::force_tbb_terminate.name()) { + tbbTerminateFlag = it.second.as(); +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + if (tbbTerminateFlag) { + if (!tbbTaskScheduler) { +# if (TBB_INTERFACE_VERSION < 12000) + tbbTaskScheduler = std::make_shared(); +# elif (TBB_INTERFACE_VERSION < 12060) + tbbTaskScheduler = + std::make_shared(oneapi::tbb::task_scheduler_handle::get()); +# else + tbbTaskScheduler = std::make_shared(tbb::attach{}); +# endif + } + } else { + tbbTaskScheduler = nullptr; + } +#endif + } + } +} +ov::Any ExecutorManagerImpl::get_property(const std::string& name) const { + std::lock_guard guard(global_mutex); + if (name == ov::force_tbb_terminate.name()) { + return tbbTerminateFlag; + } + OPENVINO_UNREACHABLE("Property ", name, " is not supported."); +} + +void ExecutorManagerImpl::reset_tbb() { + std::lock_guard guard(global_mutex); + if (tbbTerminateFlag) { +#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO + if (tbbTaskScheduler && tbbThreadsCreated) { +# if (TBB_INTERFACE_VERSION < 12000) + tbbTaskScheduler->terminate(); +# else + tbb::finalize(*tbbTaskScheduler, std::nothrow); +# endif + } + tbbThreadsCreated = false; + tbbTaskScheduler = nullptr; +#endif + tbbTerminateFlag = false; + } +} + +InferenceEngine::ITaskExecutor::Ptr ExecutorManagerImpl::get_executor(const std::string& id) { + std::lock_guard guard(taskExecutorMutex); + auto foundEntry = executors.find(id); + if (foundEntry == executors.end()) { + auto newExec = + std::make_shared(InferenceEngine::IStreamsExecutor::Config{id}); + tbbThreadsCreated = true; + executors[id] = newExec; + return newExec; + } + return foundEntry->second; +} + +InferenceEngine::IStreamsExecutor::Ptr ExecutorManagerImpl::get_idle_cpu_streams_executor( + const InferenceEngine::IStreamsExecutor::Config& config) { + std::lock_guard guard(streamExecutorMutex); + for (const auto& it : cpuStreamsExecutors) { + const auto& executor = it.second; + if (executor.use_count() != 1) + continue; + + const auto& executorConfig = it.first; + if (executorConfig._name == config._name && executorConfig._streams == config._streams && + executorConfig._threadsPerStream == config._threadsPerStream && + executorConfig._threadBindingType == config._threadBindingType && + executorConfig._threadBindingStep == config._threadBindingStep && + executorConfig._threadBindingOffset == config._threadBindingOffset) + if (executorConfig._threadBindingType != + InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE || + executorConfig._threadPreferredCoreType == config._threadPreferredCoreType) + return executor; + } + auto newExec = std::make_shared(config); + tbbThreadsCreated = true; + cpuStreamsExecutors.emplace_back(std::make_pair(config, newExec)); + return newExec; +} + +size_t ExecutorManagerImpl::get_executors_number() const { + std::lock_guard guard(taskExecutorMutex); + return executors.size(); +} + +size_t ExecutorManagerImpl::get_idle_cpu_streams_executors_number() const { + std::lock_guard guard(streamExecutorMutex); + return cpuStreamsExecutors.size(); +} + +void ExecutorManagerImpl::clear(const std::string& id) { + std::lock_guard stream_guard(streamExecutorMutex); + std::lock_guard task_guard(taskExecutorMutex); + if (id.empty()) { + executors.clear(); + cpuStreamsExecutors.clear(); + } else { + executors.erase(id); + cpuStreamsExecutors.erase(std::remove_if(cpuStreamsExecutors.begin(), + cpuStreamsExecutors.end(), + [&](const std::pair& it) { + return it.first._name == id; + }), + cpuStreamsExecutors.end()); + } +} + +namespace { + +class ExecutorManagerHolder { + std::mutex _mutex; + std::weak_ptr _manager; + +public: + ExecutorManagerHolder(const ExecutorManagerHolder&) = delete; + ExecutorManagerHolder& operator=(const ExecutorManagerHolder&) = delete; + + ExecutorManagerHolder() = default; + + std::shared_ptr get() { + std::lock_guard lock(_mutex); + auto manager = _manager.lock(); + if (!manager) { + _manager = manager = std::make_shared(); + } + return manager; + } +}; + +} // namespace + +std::shared_ptr executor_manager() { + static ExecutorManagerHolder executorManagerHolder; + return executorManagerHolder.get(); +} + +} // namespace ov diff --git a/src/inference/src/threading/ie_executor_manager.cpp b/src/inference/src/threading/ie_executor_manager.cpp index 6e52117976d..7ea0e16f3cd 100644 --- a/src/inference/src/threading/ie_executor_manager.cpp +++ b/src/inference/src/threading/ie_executor_manager.cpp @@ -5,6 +5,8 @@ #include "threading/ie_executor_manager.hpp" #include "ie_parallel.hpp" +#include "openvino/runtime/properties.hpp" +#include "openvino/runtime/threading/executor_manager.hpp" #include "threading/ie_cpu_streams_executor.hpp" #if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO # if (TBB_INTERFACE_VERSION < 12000) @@ -23,7 +25,7 @@ namespace InferenceEngine { namespace { class ExecutorManagerImpl : public ExecutorManager { public: - ~ExecutorManagerImpl(); + ExecutorManagerImpl(const std::shared_ptr& manager); ITaskExecutor::Ptr getExecutor(const std::string& id) override; IStreamsExecutor::Ptr getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) override; size_t getExecutorsNumber() const override; @@ -33,134 +35,47 @@ public: bool getTbbFlag() override; private: - void resetTbb(); - std::unordered_map executors; - std::vector> cpuStreamsExecutors; - mutable std::mutex streamExecutorMutex; - mutable std::mutex taskExecutorMutex; - bool tbbTerminateFlag = false; - mutable std::mutex tbbMutex; - bool tbbThreadsCreated = false; -#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO -# if (TBB_INTERFACE_VERSION < 12000) - std::shared_ptr tbbTaskScheduler = nullptr; -# else - std::shared_ptr tbbTaskScheduler = nullptr; -# endif -#endif + std::shared_ptr m_manager; + std::shared_ptr get_ov_manager() const override { + return m_manager; + } }; } // namespace -ExecutorManagerImpl::~ExecutorManagerImpl() { - resetTbb(); -} +ExecutorManagerImpl::ExecutorManagerImpl(const std::shared_ptr& manager) : m_manager(manager) {} void ExecutorManagerImpl::setTbbFlag(bool flag) { - std::lock_guard guard(tbbMutex); - tbbTerminateFlag = flag; -#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO - if (tbbTerminateFlag) { - if (!tbbTaskScheduler) { -# if (TBB_INTERFACE_VERSION < 12000) - tbbTaskScheduler = std::make_shared(); -# elif (TBB_INTERFACE_VERSION < 12060) - tbbTaskScheduler = - std::make_shared(oneapi::tbb::task_scheduler_handle::get()); -# else - tbbTaskScheduler = std::make_shared(tbb::attach{}); -# endif - } - } else { - tbbTaskScheduler = nullptr; - } -#endif + m_manager->set_property({{ov::force_tbb_terminate.name(), flag}}); } bool ExecutorManagerImpl::getTbbFlag() { - std::lock_guard guard(tbbMutex); - return tbbTerminateFlag; -} - -void ExecutorManagerImpl::resetTbb() { - std::lock_guard guard(tbbMutex); - if (tbbTerminateFlag) { -#if IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO - if (tbbTaskScheduler && tbbThreadsCreated) { -# if (TBB_INTERFACE_VERSION < 12000) - tbbTaskScheduler->terminate(); -# else - tbb::finalize(*tbbTaskScheduler, std::nothrow); -# endif - } - tbbThreadsCreated = false; - tbbTaskScheduler = nullptr; -#endif - tbbTerminateFlag = false; - } + return m_manager->get_property(ov::force_tbb_terminate.name()).as(); } ITaskExecutor::Ptr ExecutorManagerImpl::getExecutor(const std::string& id) { - std::lock_guard guard(taskExecutorMutex); - auto foundEntry = executors.find(id); - if (foundEntry == executors.end()) { - auto newExec = std::make_shared(IStreamsExecutor::Config{id}); - tbbThreadsCreated = true; - executors[id] = newExec; - return newExec; - } - return foundEntry->second; + return m_manager->get_executor(id); } IStreamsExecutor::Ptr ExecutorManagerImpl::getIdleCPUStreamsExecutor(const IStreamsExecutor::Config& config) { - std::lock_guard guard(streamExecutorMutex); - for (const auto& it : cpuStreamsExecutors) { - const auto& executor = it.second; - if (executor.use_count() != 1) - continue; - - const auto& executorConfig = it.first; - if (executorConfig._name == config._name && executorConfig._streams == config._streams && - executorConfig._threadsPerStream == config._threadsPerStream && - executorConfig._threadBindingType == config._threadBindingType && - executorConfig._threadBindingStep == config._threadBindingStep && - executorConfig._threadBindingOffset == config._threadBindingOffset) - if (executorConfig._threadBindingType != IStreamsExecutor::ThreadBindingType::HYBRID_AWARE || - executorConfig._threadPreferredCoreType == config._threadPreferredCoreType) - return executor; - } - auto newExec = std::make_shared(config); - tbbThreadsCreated = true; - cpuStreamsExecutors.emplace_back(std::make_pair(config, newExec)); - return newExec; + return m_manager->get_idle_cpu_streams_executor(config); } size_t ExecutorManagerImpl::getExecutorsNumber() const { - std::lock_guard guard(taskExecutorMutex); - return executors.size(); + return m_manager->get_executors_number(); } size_t ExecutorManagerImpl::getIdleCPUStreamsExecutorsNumber() const { - std::lock_guard guard(streamExecutorMutex); - return cpuStreamsExecutors.size(); + return m_manager->get_idle_cpu_streams_executors_number(); } void ExecutorManagerImpl::clear(const std::string& id) { - std::lock_guard stream_guard(streamExecutorMutex); - std::lock_guard task_guard(taskExecutorMutex); - if (id.empty()) { - executors.clear(); - cpuStreamsExecutors.clear(); - } else { - executors.erase(id); - cpuStreamsExecutors.erase( - std::remove_if(cpuStreamsExecutors.begin(), - cpuStreamsExecutors.end(), - [&](const std::pair& it) { - return it.first._name == id; - }), - cpuStreamsExecutors.end()); - } + return m_manager->clear(id); +} + +std::shared_ptr create_old_manager( + const std::shared_ptr& manager) { + return std::make_shared(manager); } namespace { @@ -179,7 +94,7 @@ public: std::lock_guard lock(_mutex); auto manager = _manager.lock(); if (!manager) { - _manager = manager = std::make_shared(); + _manager = manager = create_old_manager(ov::executor_manager()); } return manager; } diff --git a/src/plugins/template/src/plugin.cpp b/src/plugins/template/src/plugin.cpp index 41cc097551e..eeda242842b 100644 --- a/src/plugins/template/src/plugin.cpp +++ b/src/plugins/template/src/plugin.cpp @@ -35,7 +35,7 @@ Plugin::Plugin() { _backend = ngraph::runtime::Backend::create(); // create default stream executor with a given name - _waitExecutor = get_executor_manager()->getIdleCPUStreamsExecutor({wait_executor_name}); + _waitExecutor = get_executor_manager()->get_idle_cpu_streams_executor({wait_executor_name}); } // ! [plugin:ctor] @@ -96,7 +96,7 @@ std::shared_ptr TemplatePlugin::Plugin::compile_model(const auto compiled_model = std::make_shared(model->clone(), shared_from_this(), - get_executor_manager()->getIdleCPUStreamsExecutor(streamsExecutorConfig), + get_executor_manager()->get_idle_cpu_streams_executor(streamsExecutorConfig), fullConfig); return compiled_model; } @@ -136,7 +136,7 @@ std::shared_ptr TemplatePlugin::Plugin::import_model(std::is auto compiled_model = std::make_shared(ov_model, shared_from_this(), - get_executor_manager()->getIdleCPUStreamsExecutor(streamsExecutorConfig), + get_executor_manager()->get_idle_cpu_streams_executor(streamsExecutorConfig), fullConfig); return compiled_model; }