quick-n-dirty timeout impl

This commit is contained in:
myshevts 2021-10-18 16:40:18 +03:00
parent 3d2cd5fb71
commit 8cae121764
2 changed files with 74 additions and 14 deletions

View File

@ -129,8 +129,21 @@ AutoBatchInferRequest::AutoBatchInferRequest(const InputsDataMap& networkInput
}
}
void AutoBatchInferRequest::SetBlobsToAnotherRequest(InferRequest& req) {
// todo call Set for REMOTE BLOB
void AutoBatchInferRequest::SetBlobsToAnotherRequest(SoIInferRequestInternal& req) {
for (const auto &it : _networkInputs) {
auto &name = it.first;
// this request is already in BUSY state, so using the internal functions safely
auto blob = GetBlob(name);
if (req->GetBlob(name) != blob)
req->SetBlob(name, blob);
}
for (const auto &it : _networkOutputs) {
auto &name = it.first;
// this request is already in BUSY state, so using the internal functions safely
auto blob = GetBlob(name);
if (req->GetBlob(name) != blob)
req->SetBlob(name, blob);
}
}
std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> AutoBatchInferRequest::GetPerformanceCounts() const {
@ -149,15 +162,20 @@ void AutoBatchInferRequest::InferImpl() {
AutoBatchAsyncInferRequest::AutoBatchAsyncInferRequest(
const AutoBatchInferRequest::Ptr& inferRequest,
const bool needPerfCounters,
InferenceEngine::SoIInferRequestInternal& inferRequestWithoutBatch,
const ITaskExecutor::Ptr& callbackExecutor) :
AsyncInferRequestThreadSafeDefault(inferRequest, nullptr, callbackExecutor),
_inferRequestWithoutBatch(inferRequestWithoutBatch),
_inferRequest{inferRequest} {
// this executor starts the inference while the task (checking the result) is passed to the next stage
struct ThisRequestExecutor : public ITaskExecutor {
explicit ThisRequestExecutor(AutoBatchAsyncInferRequest* _this_) : _this{_this_} {}
void run(Task task) override {
auto& workerInferRequest = _this->_inferRequest->_workerInferRequest;
workerInferRequest->_tasks.push(std::move(task));
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
t.first = _this;
t.second = std::move(task);
workerInferRequest->_tasks.push(t);
_this->_inferRequest->InferImpl();
};
AutoBatchAsyncInferRequest* _this = nullptr;
@ -181,14 +199,16 @@ AutoBatchAsyncInferRequest::~AutoBatchAsyncInferRequest() {
// ------------------------------AutoBatchExecutableNetwork----------------------------
AutoBatchExecutableNetwork::AutoBatchExecutableNetwork(const InferenceEngine::SoExecutableNetworkInternal& networkForDevice,
const DeviceInformation& networkDevice,
const std::unordered_map<std::string, InferenceEngine::Parameter>& config,
const bool needPerfCounters) :
const InferenceEngine::SoExecutableNetworkInternal& networkWithoutBatch,
const DeviceInformation& networkDevice,
const std::unordered_map<std::string, InferenceEngine::Parameter>& config,
const bool needPerfCounters) :
InferenceEngine::ExecutableNetworkThreadSafeDefault(
nullptr,
std::make_shared<InferenceEngine::ImmediateExecutor>()),
_device{networkDevice},
_network{networkForDevice},
_networkWithoutBatch{networkWithoutBatch},
_config{config},
_needPerfCounters{needPerfCounters} {
}
@ -202,6 +222,9 @@ AutoBatchExecutableNetwork::~AutoBatchExecutableNetwork() {
/* NOTE: The only threads that use `AutoBatchExecutableNetwork` Context are those that are used by Worker infer requests.
* But AsyncInferRequest destructor should waits for all asynchronous tasks that are used by the request
*/
for (auto w : _workerRequests) {
w->_thread.join();
}
_workerRequests.clear();
}
@ -217,14 +240,40 @@ InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateIn
workerRequestPtr->_batchSize = _device.batchForDevice;
workerRequestPtr->_inferRequest->SetCallback(
[workerRequestPtr, this] (std::exception_ptr exceptionPtr) mutable {
Task t;
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
int num = 0;
while (workerRequestPtr->_tasks.try_pop(t)) {
t();
if (workerRequestPtr->_batchSize == ++num)
break;
{
//static int times = 0;
//std::cout << "BATCH:" << times++ << std::endl;
std::unique_lock<std::mutex> lock(workerRequestPtr->_mutex);
while (workerRequestPtr->_tasks.try_pop(t)) {
t.second(); // execute the fake tasks tpo signal the pipeline is over
if (workerRequestPtr->_batchSize == ++num)
break;
}
}
// reset the timeout
workerRequestPtr->_cond.notify_one();
});
workerRequestPtr->_thread = std::thread([workerRequestPtr, this] {
while (!_terminate) {
std::unique_lock<std::mutex> lock(workerRequestPtr->_mutex);
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(150));
if (!_terminate && status == std::cv_status::timeout) {
// timeout to collect the batch is over, have to execute the requests in the batch1 mode
std::cout << "TIME_OUT" << std::endl;
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
// popping all tasks and execute with batch1
// IE_ASSERT(workerRequestPtr->_tasks.unsafe_size() < (size_t)_device.batchForDevice);
while (workerRequestPtr->_tasks.try_pop(t)) {
t.first->_inferRequestWithoutBatch->SetCallback([t](std::exception_ptr){t.second();});
t.first->_inferRequest->SetBlobsToAnotherRequest(t.first->_inferRequestWithoutBatch);
t.first->_inferRequestWithoutBatch->StartAsync();
}
}
}
});
}
return std::make_shared<AutoBatchInferRequest>(networkInputs, networkOutputs, _workerRequests.back().get(),
batch_id, _device.batchForDevice, _needPerfCounters);
@ -233,8 +282,11 @@ InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateIn
InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateInferRequest() {
auto syncRequestImpl = CreateInferRequestImpl(_networkInputs, _networkOutputs);
syncRequestImpl->setPointerToExecutableNetworkInternal(shared_from_this());
InferenceEngine::SoIInferRequestInternal inferRequestWithoutBatch = {_networkWithoutBatch._so,
_networkWithoutBatch->CreateInferRequest()};
return std::make_shared<AutoBatchAsyncInferRequest>(std::static_pointer_cast<AutoBatchInferRequest>(syncRequestImpl),
_needPerfCounters,
inferRequestWithoutBatch,
_callbackExecutor);
}
@ -422,6 +474,7 @@ IExecutableNetworkInternal::Ptr AutoBatchInferencePlugin::LoadExeNetworkImpl(con
const auto perfConfig = fullConfig.find(PluginConfigParams::KEY_PERF_COUNT);
const bool enablePerfCounters = (fullConfig.end() != perfConfig) && (perfConfig->second == PluginConfigParams::YES);
auto networkWithoutBatch = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
// device settings + auto-batch settings
std::unordered_map<std::string, InferenceEngine::Parameter> networkConfig;
networkConfig.insert(*device_batch);
@ -454,6 +507,7 @@ IExecutableNetworkInternal::Ptr AutoBatchInferencePlugin::LoadExeNetworkImpl(con
if (footprint < total_mem) {
return std::make_shared<AutoBatchExecutableNetwork>(executableNetworkForDevice,
networkWithoutBatch,
metaDevice,
networkConfig,
enablePerfCounters);

View File

@ -78,11 +78,15 @@ public:
InferenceEngine::StatusCode _status = InferenceEngine::StatusCode::OK;
int _batchSize;
std::atomic_int _numRequestsReady = {0};
ThreadSafeQueue<InferenceEngine::Task> _tasks;
ThreadSafeQueue<std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task>> _tasks;
std::thread _thread;
std::condition_variable _cond;
std::mutex _mutex;
};
using NotBusyWorkerRequests = ThreadSafeQueue<WorkerInferRequest*>;
explicit AutoBatchExecutableNetwork(const InferenceEngine::SoExecutableNetworkInternal& networkForDevice,
const InferenceEngine::SoExecutableNetworkInternal& networkForDeviceWithoutBatch,
const DeviceInformation& networkDevices,
const std::unordered_map<std::string, InferenceEngine::Parameter>& config,
const bool needPerfCounters = false);
@ -98,6 +102,7 @@ public:
std::atomic_bool _terminate = {false};
DeviceInformation _device;
InferenceEngine::SoExecutableNetworkInternal _network;
InferenceEngine::SoExecutableNetworkInternal _networkWithoutBatch;
std::vector<WorkerInferRequest::Ptr> _workerRequests;
std::unordered_map<std::string, InferenceEngine::Parameter> _config;
bool _needPerfCounters = false;
@ -115,7 +120,7 @@ public:
void InferImpl() override;
// Batch-Device impl specific: sets the data (blobs from the device request to the batched device request)
void SetBlobsToAnotherRequest(InferenceEngine::InferRequest& req);
void SetBlobsToAnotherRequest(InferenceEngine::SoIInferRequestInternal& req);
AutoBatchExecutableNetwork::WorkerInferRequest* _workerInferRequest;
protected:
std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> _perfMap;
@ -128,11 +133,12 @@ public:
explicit AutoBatchAsyncInferRequest(const AutoBatchInferRequest::Ptr& inferRequest,
const bool needPerfCounters,
InferenceEngine::SoIInferRequestInternal& inferRequestWithoutBatch,
const InferenceEngine::ITaskExecutor::Ptr& callbackExecutor);
void Infer_ThreadUnsafe() override;
virtual ~AutoBatchAsyncInferRequest();
protected:
InferenceEngine::SoIInferRequestInternal _inferRequestWithoutBatch;
AutoBatchInferRequest::Ptr _inferRequest;
};