explicit _completionTasks,reverting BA to use the timeout

This commit is contained in:
myshevts 2021-10-19 15:14:27 +03:00
parent 8cae121764
commit 294e202a05
3 changed files with 24 additions and 17 deletions

View File

@ -666,8 +666,14 @@ int main(int argc, char* argv[]) {
next_step(ss.str());
// warming up - out of scope
for (size_t i = 0; i < inferRequestsQueue.requests.size(); i++) {
inferRequestsQueue.getIdleRequest()->startAsync();
auto inferRequest = inferRequestsQueue.getIdleRequest();
if (!inferRequest) {
IE_THROW() << "No idle Infer Requests!";
}
if (FLAGS_api == "sync") {
inferRequest->infer();
} else {
inferRequest->startAsync();
}
inferRequestsQueue.waitAll();
auto duration_ms = double_to_string(inferRequestsQueue.getLatencies()[0]);
@ -688,7 +694,7 @@ int main(int argc, char* argv[]) {
while ((niter != 0LL && iteration < niter) ||
(duration_nanoseconds != 0LL && (uint64_t)execTime < duration_nanoseconds) ||
(FLAGS_api == "async" && iteration % nireq != 0)) {
auto inferRequest = inferRequestsQueue.getIdleRequest();
inferRequest = inferRequestsQueue.getIdleRequest();
if (!inferRequest) {
IE_THROW() << "No idle Infer Requests!";
}

View File

@ -156,6 +156,12 @@ void AutoBatchInferRequest::InferImpl() {
if (numReady == _workerInferRequest->_batchSize) {
_workerInferRequest->_numRequestsReady = 0;
_workerInferRequest->_inferRequest->StartAsync();
std::unique_lock<std::mutex> lock(_workerInferRequest->_mutex);
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
for (int c = 0; c < _workerInferRequest->_batchSize; c++) {
_workerInferRequest->_tasks.try_pop(t);
_workerInferRequest->_completionTasks[c]= std::move(t.second);
}
}
}
@ -238,19 +244,13 @@ InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateIn
auto workerRequestPtr = _workerRequests.back();
workerRequestPtr->_inferRequest = {_network._so, _network->CreateInferRequest()};
workerRequestPtr->_batchSize = _device.batchForDevice;
workerRequestPtr->_completionTasks.resize(workerRequestPtr->_batchSize);
workerRequestPtr->_inferRequest->SetCallback(
[workerRequestPtr, this] (std::exception_ptr exceptionPtr) mutable {
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
int num = 0;
{
//static int times = 0;
//std::cout << "BATCH:" << times++ << std::endl;
std::unique_lock<std::mutex> lock(workerRequestPtr->_mutex);
while (workerRequestPtr->_tasks.try_pop(t)) {
t.second(); // execute the fake tasks tpo signal the pipeline is over
if (workerRequestPtr->_batchSize == ++num)
break;
}
IE_ASSERT(workerRequestPtr->_completionTasks.size() == (size_t)workerRequestPtr->_batchSize);
// notify the ibvidual requests on the completion
for (int c = 0; c < workerRequestPtr->_batchSize; c++) {
workerRequestPtr->_completionTasks[c]();
}
// reset the timeout
workerRequestPtr->_cond.notify_one();
@ -259,13 +259,13 @@ InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateIn
workerRequestPtr->_thread = std::thread([workerRequestPtr, this] {
while (!_terminate) {
std::unique_lock<std::mutex> lock(workerRequestPtr->_mutex);
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(150));
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(1));
if (!_terminate && status == std::cv_status::timeout) {
// timeout to collect the batch is over, have to execute the requests in the batch1 mode
std::cout << "TIME_OUT" << std::endl;
std::cout << "TIME_OUT with tasks: " << workerRequestPtr->_tasks.unsafe_size() << std::endl;
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
// popping all tasks and execute with batch1
// IE_ASSERT(workerRequestPtr->_tasks.unsafe_size() < (size_t)_device.batchForDevice);
IE_ASSERT(workerRequestPtr->_tasks.unsafe_size() < (size_t)_device.batchForDevice);
while (workerRequestPtr->_tasks.try_pop(t)) {
t.first->_inferRequestWithoutBatch->SetCallback([t](std::exception_ptr){t.second();});
t.first->_inferRequest->SetBlobsToAnotherRequest(t.first->_inferRequestWithoutBatch);

View File

@ -79,6 +79,7 @@ public:
int _batchSize;
std::atomic_int _numRequestsReady = {0};
ThreadSafeQueue<std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task>> _tasks;
std::vector<InferenceEngine::Task> _completionTasks;
std::thread _thread;
std::condition_variable _cond;
std::mutex _mutex;