moving ThreadSafeQueue to the ie_parallel, as it is re-used in the AUTO/MULTI and BATCH now

This commit is contained in:
myshevts 2021-11-25 10:55:07 +03:00
parent a7e28580e6
commit 17d84b990c
3 changed files with 72 additions and 104 deletions

View File

@ -34,40 +34,6 @@ struct DeviceInformation {
int batchForDevice;
};
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
template <typename T>
using ThreadSafeQueue = tbb::concurrent_queue<T>;
#else
template <typename T>
class ThreadSafeQueue {
public:
void push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(std::move(value));
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (!_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
bool empty() {
std::lock_guard<std::mutex> lock(_mutex);
return _queue.empty();
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
};
#endif
class AutoBatchAsyncInferRequest;
class AutoBatchExecutableNetwork : public InferenceEngine::ExecutableNetworkThreadSafeDefault {
public:
@ -77,13 +43,13 @@ public:
InferenceEngine::SoIInferRequestInternal _inferRequest;
InferenceEngine::StatusCode _status = InferenceEngine::StatusCode::OK;
int _batchSize;
ThreadSafeQueue<std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task>> _tasks;
InferenceEngine::ThreadSafeQueue<std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task>> _tasks;
std::vector<InferenceEngine::Task> _completionTasks;
std::thread _thread;
std::condition_variable _cond;
std::mutex _mutex;
};
using NotBusyWorkerRequests = ThreadSafeQueue<WorkerInferRequest*>;
using NotBusyWorkerRequests = InferenceEngine::ThreadSafeQueue<WorkerInferRequest*>;
explicit AutoBatchExecutableNetwork(const InferenceEngine::SoExecutableNetworkInternal& networkForDevice,
const InferenceEngine::SoExecutableNetworkInternal& networkForDeviceWithoutBatch,

View File

@ -15,6 +15,8 @@
#pragma once
#include <cstddef>
#include <mutex>
#include <queue>
#include <type_traits>
#define IE_THREAD_TBB 0
@ -36,6 +38,8 @@
# define TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION 1
# endif
# include <tbb/concurrent_queue.h>
# include "tbb/blocked_range.h"
# include "tbb/blocked_range2d.h"
# include "tbb/blocked_range3d.h"
@ -129,6 +133,69 @@ inline void parallel_set_num_threads(int) {
#endif
namespace InferenceEngine {
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
template <typename T>
using ThreadSafeQueue = tbb::concurrent_queue<T>;
template <typename T>
using ThreadSafeBoundedQueue = tbb::concurrent_bounded_queue<T>;
#else
template <typename T>
class ThreadSafeQueue {
public:
void push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(std::move(value));
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (!_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
size_t unsafe_size() {
return _queue.size();
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
};
template <typename T>
class ThreadSafeBoundedQueue {
public:
ThreadSafeBoundedQueue() = default;
bool try_push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity) {
_queue.push(std::move(value));
}
return _capacity;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity && !_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
void set_capacity(std::size_t newCapacity) {
std::lock_guard<std::mutex> lock(_mutex);
_capacity = newCapacity;
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
bool _capacity = false;
};
#endif
template <typename F>
void parallel_nt(int nthr, const F& func) {

View File

@ -7,7 +7,6 @@
#include <atomic>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <map>
#include <vector>
@ -19,10 +18,6 @@
#include <threading/ie_executor_manager.hpp>
#include "ie_icore.hpp"
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
# include <tbb/concurrent_queue.h>
#endif
#ifdef MULTIUNITTEST
#define MOCKTESTMACRO virtual
#define MultiDevicePlugin MockMultiDevicePlugin
@ -68,66 +63,6 @@ enum AutoLoadContextIndex {
template<typename T>
using DeviceMap = std::unordered_map<DeviceName, T>;
#if ((IE_THREAD == IE_THREAD_TBB) || (IE_THREAD == IE_THREAD_TBB_AUTO))
template <typename T>
using ThreadSafeQueue = tbb::concurrent_queue<T>;
template <typename T>
using ThreadSafeBoundedQueue = tbb::concurrent_bounded_queue<T>;
#else
template <typename T>
class ThreadSafeQueue {
public:
void push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(std::move(value));
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (!_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
};
template <typename T>
class ThreadSafeBoundedQueue {
public:
ThreadSafeBoundedQueue() = default;
bool try_push(T value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity) {
_queue.push(std::move(value));
}
return _capacity;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(_mutex);
if (_capacity && !_queue.empty()) {
value = std::move(_queue.front());
_queue.pop();
return true;
} else {
return false;
}
}
void set_capacity(std::size_t newCapacity) {
std::lock_guard<std::mutex> lock(_mutex);
_capacity = newCapacity;
}
protected:
std::queue<T> _queue;
std::mutex _mutex;
bool _capacity = false;
};
#endif
class MultiDeviceExecutableNetwork : public InferenceEngine::ExecutableNetworkThreadSafeDefault,
public InferenceEngine::ITaskExecutor {
public:
@ -137,7 +72,7 @@ public:
InferenceEngine::Task _task;
std::exception_ptr _exceptionPtr = nullptr;
};
using NotBusyWorkerRequests = ThreadSafeBoundedQueue<WorkerInferRequest*>;
using NotBusyWorkerRequests = InferenceEngine::ThreadSafeBoundedQueue<WorkerInferRequest*>;
explicit MultiDeviceExecutableNetwork(const DeviceMap<InferenceEngine::SoExecutableNetworkInternal>& networksPerDevice,
const std::vector<DeviceInformation>& networkDevices,
@ -174,8 +109,8 @@ public:
std::vector<DeviceInformation> _devicePriorities;
const std::vector<DeviceInformation> _devicePrioritiesInitial;
DeviceMap<InferenceEngine::SoExecutableNetworkInternal> _networksPerDevice;
ThreadSafeQueue<InferenceEngine::Task> _inferPipelineTasks;
DeviceMap<std::unique_ptr<ThreadSafeQueue<InferenceEngine::Task>>> _inferPipelineTasksDeviceSpecific;
InferenceEngine::ThreadSafeQueue<InferenceEngine::Task> _inferPipelineTasks;
DeviceMap<std::unique_ptr<InferenceEngine::ThreadSafeQueue<InferenceEngine::Task>>> _inferPipelineTasksDeviceSpecific;
DeviceMap<NotBusyWorkerRequests> _idleWorkerRequests;
DeviceMap<std::vector<WorkerInferRequest>> _workerRequests;
std::unordered_map<std::string, InferenceEngine::Parameter> _config;