moved the bacthed request statring ( along with input copies) to the dedicated thread

This commit is contained in:
myshevts 2021-10-25 18:09:12 +03:00
parent 74040fed95
commit 4ddf63ab5f
2 changed files with 20 additions and 29 deletions

View File

@ -189,23 +189,6 @@ void AutoBatchInferRequest::CopyOutputsIfNeeded() {
std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> AutoBatchInferRequest::GetPerformanceCounts() const {
return _perfMap;
}
void AutoBatchInferRequest::InferImpl() {
std::unique_lock<std::mutex> lock(_workerInferRequest->_mutex);
int sz = _workerInferRequest->_tasks.unsafe_size();
if (sz == _workerInferRequest->_batchSize) {
// auto start = std::chrono::high_resolution_clock::now();
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
for (int c = 0; c < _workerInferRequest->_batchSize; c++) {
IE_ASSERT(_workerInferRequest->_tasks.try_pop(t));
_workerInferRequest->_completionTasks[c] = std::move(t.second);
t.first->_inferRequest->CopyInputsIfNeeded();
}
_workerInferRequest->_inferRequest->StartAsync();
// auto waitTime = std::chrono::duration_cast<std::chrono::microseconds>
// (std::chrono::high_resolution_clock::now() - start).count();
// std::cout << "START BATCH: " << waitTime << std::endl;
}
}
AutoBatchAsyncInferRequest::AutoBatchAsyncInferRequest(
const AutoBatchInferRequest::Ptr& inferRequest,
@ -224,7 +207,10 @@ AutoBatchAsyncInferRequest::AutoBatchAsyncInferRequest(
t.first = _this;
t.second = std::move(task);
workerInferRequest->_tasks.push(t);
_this->_inferRequest->InferImpl();
const int sz = workerInferRequest->_tasks.unsafe_size();
if (sz == workerInferRequest->_batchSize) {
workerInferRequest->_cond.notify_one();
}
};
AutoBatchAsyncInferRequest* _this = nullptr;
};
@ -302,23 +288,30 @@ InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateIn
workerRequestPtr->_thread = std::thread([workerRequestPtr, this] {
while (1) {
std::unique_lock<std::mutex> lock(workerRequestPtr->_mutex);
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(30));
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(100));
// as we pop the tasks from the queue only here
// it is ok to call unsafe_size (as the _tasks can only grow in parallel)
const int sz = workerRequestPtr->_tasks.unsafe_size();
if (_terminate) {
break;
} else if (status == std::cv_status::timeout) {
// timeout to collect the batch is over, have to execute the requests in the batch1 mode
// as we pop the tasks from the queue only here (and when the batch has been fully collected)
// both places are guarded with the same mutex
// it is ok to call unsafe_size (as the _tasks can only grow in parallel)
int sz = workerRequestPtr->_tasks.unsafe_size();
if (sz) {
} else {
if (sz == workerRequestPtr->_batchSize) {
std::pair<AutoBatchAsyncInferRequest *, InferenceEngine::Task> t;
for (int n = 0; n < sz; n++) {
IE_ASSERT(workerRequestPtr->_tasks.try_pop(t));
workerRequestPtr->_completionTasks[n] = std::move(t.second);
t.first->_inferRequest->CopyInputsIfNeeded();
}
workerRequestPtr->_inferRequest->StartAsync();
} else if ((status == std::cv_status::timeout) && sz) {
// timeout to collect the batch is over, have to execute the requests in the batch1 mode
auto start = std::chrono::high_resolution_clock::now();
std::pair<AutoBatchAsyncInferRequest *, InferenceEngine::Task> t;
// popping all tasks collected by the moment of the time-out and execute each with batch1
std::atomic<int> arrived = {0};
std::promise<void> all_completed;
auto all_completed_future = all_completed.get_future();
for (int n =0; n < sz; n++) {
for (int n = 0; n < sz; n++) {
IE_ASSERT(workerRequestPtr->_tasks.try_pop(t));
t.first->_inferRequestWithoutBatch->SetCallback(
[t, sz, &arrived, &all_completed](std::exception_ptr) {

View File

@ -77,7 +77,6 @@ public:
InferenceEngine::SoIInferRequestInternal _inferRequest;
InferenceEngine::StatusCode _status = InferenceEngine::StatusCode::OK;
int _batchSize;
std::atomic_int _numRequestsReady = {0};
ThreadSafeQueue<std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task>> _tasks;
std::vector<InferenceEngine::Task> _completionTasks;
std::thread _thread;
@ -118,7 +117,6 @@ public:
AutoBatchExecutableNetwork::WorkerInferRequest* workerRequestPtr,
int batch_id, int num_batch, bool _needPerfCounters = false);
std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> GetPerformanceCounts() const override;
void InferImpl() override;
// Batch-Device impl specific: sets the data (blobs from the device request to the batched device request)
void SetBlobsToAnotherRequest(InferenceEngine::SoIInferRequestInternal& req);