diff --git a/inference-engine/src/auto_batch/auto_batch.hpp b/inference-engine/src/auto_batch/auto_batch.hpp index 56672efef6f..d0945700406 100644 --- a/inference-engine/src/auto_batch/auto_batch.hpp +++ b/inference-engine/src/auto_batch/auto_batch.hpp @@ -34,40 +34,6 @@ struct DeviceInformation { int batchForDevice; }; -#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -template -using ThreadSafeQueue = tbb::concurrent_queue; -#else -template -class ThreadSafeQueue { -public: - void push(T value) { - std::lock_guard lock(_mutex); - _queue.push(std::move(value)); - } - - bool try_pop(T& value) { - std::lock_guard lock(_mutex); - if (!_queue.empty()) { - value = std::move(_queue.front()); - _queue.pop(); - return true; - } else { - return false; - } - } - - bool empty() { - std::lock_guard lock(_mutex); - return _queue.empty(); - } - -protected: - std::queue _queue; - std::mutex _mutex; -}; -#endif - class AutoBatchAsyncInferRequest; class AutoBatchExecutableNetwork : public InferenceEngine::ExecutableNetworkThreadSafeDefault { public: @@ -77,13 +43,13 @@ public: InferenceEngine::SoIInferRequestInternal _inferRequest; InferenceEngine::StatusCode _status = InferenceEngine::StatusCode::OK; int _batchSize; - ThreadSafeQueue> _tasks; + InferenceEngine::ThreadSafeQueue> _tasks; std::vector _completionTasks; std::thread _thread; std::condition_variable _cond; std::mutex _mutex; }; - using NotBusyWorkerRequests = ThreadSafeQueue; + using NotBusyWorkerRequests = InferenceEngine::ThreadSafeQueue; explicit AutoBatchExecutableNetwork(const InferenceEngine::SoExecutableNetworkInternal& networkForDevice, const InferenceEngine::SoExecutableNetworkInternal& networkForDeviceWithoutBatch, diff --git a/inference-engine/src/inference_engine/include/ie/ie_parallel.hpp b/inference-engine/src/inference_engine/include/ie/ie_parallel.hpp index 0a492b9d21a..cddd5387c94 100644 --- a/inference-engine/src/inference_engine/include/ie/ie_parallel.hpp +++ b/inference-engine/src/inference_engine/include/ie/ie_parallel.hpp @@ -15,6 +15,8 @@ #pragma once #include +#include +#include #include #define IE_THREAD_TBB 0 @@ -36,6 +38,8 @@ # define TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION 1 # endif +# include + # include "tbb/blocked_range.h" # include "tbb/blocked_range2d.h" # include "tbb/blocked_range3d.h" @@ -129,6 +133,69 @@ inline void parallel_set_num_threads(int) { #endif namespace InferenceEngine { +#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) +template +using ThreadSafeQueue = tbb::concurrent_queue; +template +using ThreadSafeBoundedQueue = tbb::concurrent_bounded_queue; +#else +template +class ThreadSafeQueue { +public: + void push(T value) { + std::lock_guard lock(_mutex); + _queue.push(std::move(value)); + } + bool try_pop(T& value) { + std::lock_guard lock(_mutex); + if (!_queue.empty()) { + value = std::move(_queue.front()); + _queue.pop(); + return true; + } else { + return false; + } + } + size_t unsafe_size() { + return _queue.size(); + } + +protected: + std::queue _queue; + std::mutex _mutex; +}; +template +class ThreadSafeBoundedQueue { +public: + ThreadSafeBoundedQueue() = default; + bool try_push(T value) { + std::lock_guard lock(_mutex); + if (_capacity) { + _queue.push(std::move(value)); + } + return _capacity; + } + bool try_pop(T& value) { + std::lock_guard lock(_mutex); + if (_capacity && !_queue.empty()) { + value = std::move(_queue.front()); + _queue.pop(); + return true; + } else { + return false; + } + } + void set_capacity(std::size_t newCapacity) { + std::lock_guard lock(_mutex); + _capacity = newCapacity; + } + +protected: + std::queue _queue; + std::mutex _mutex; + bool _capacity = false; +}; +#endif template void parallel_nt(int nthr, const F& func) { diff --git a/inference-engine/src/multi_device/multi_device_exec_network.hpp b/inference-engine/src/multi_device/multi_device_exec_network.hpp index 9e7dff9ffb2..f1c255db5cb 100644 --- a/inference-engine/src/multi_device/multi_device_exec_network.hpp +++ b/inference-engine/src/multi_device/multi_device_exec_network.hpp @@ -7,7 +7,6 @@ #include #include -#include #include #include #include @@ -19,10 +18,6 @@ #include #include "ie_icore.hpp" -#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) -# include -#endif - #ifdef MULTIUNITTEST #define MOCKTESTMACRO virtual #define MultiDevicePlugin MockMultiDevicePlugin @@ -68,66 +63,6 @@ enum AutoLoadContextIndex { template using DeviceMap = std::unordered_map; -#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO)) -template -using ThreadSafeQueue = tbb::concurrent_queue; -template -using ThreadSafeBoundedQueue = tbb::concurrent_bounded_queue; -#else -template -class ThreadSafeQueue { -public: - void push(T value) { - std::lock_guard lock(_mutex); - _queue.push(std::move(value)); - } - bool try_pop(T& value) { - std::lock_guard lock(_mutex); - if (!_queue.empty()) { - value = std::move(_queue.front()); - _queue.pop(); - return true; - } else { - return false; - } - } -protected: - std::queue _queue; - std::mutex _mutex; -}; -template -class ThreadSafeBoundedQueue { -public: - ThreadSafeBoundedQueue() = default; - bool try_push(T value) { - std::lock_guard lock(_mutex); - if (_capacity) { - _queue.push(std::move(value)); - } - return _capacity; - } - bool try_pop(T& value) { - std::lock_guard lock(_mutex); - if (_capacity && !_queue.empty()) { - value = std::move(_queue.front()); - _queue.pop(); - return true; - } else { - return false; - } - } - void set_capacity(std::size_t newCapacity) { - std::lock_guard lock(_mutex); - _capacity = newCapacity; - } - -protected: - std::queue _queue; - std::mutex _mutex; - bool _capacity = false; -}; -#endif - class MultiDeviceExecutableNetwork : public InferenceEngine::ExecutableNetworkThreadSafeDefault, public InferenceEngine::ITaskExecutor { public: @@ -137,7 +72,7 @@ public: InferenceEngine::Task _task; std::exception_ptr _exceptionPtr = nullptr; }; - using NotBusyWorkerRequests = ThreadSafeBoundedQueue; + using NotBusyWorkerRequests = InferenceEngine::ThreadSafeBoundedQueue; explicit MultiDeviceExecutableNetwork(const DeviceMap& networksPerDevice, const std::vector& networkDevices, @@ -174,8 +109,8 @@ public: std::vector _devicePriorities; const std::vector _devicePrioritiesInitial; DeviceMap _networksPerDevice; - ThreadSafeQueue _inferPipelineTasks; - DeviceMap>> _inferPipelineTasksDeviceSpecific; + InferenceEngine::ThreadSafeQueue _inferPipelineTasks; + DeviceMap>> _inferPipelineTasksDeviceSpecific; DeviceMap _idleWorkerRequests; DeviceMap> _workerRequests; std::unordered_map _config;