inputs outputs copies, works with AUTO and demo now

This commit is contained in:
myshevts 2021-10-20 12:58:04 +03:00
parent 294e202a05
commit 0de3055825
2 changed files with 74 additions and 31 deletions

View File

@ -17,6 +17,7 @@
#include <legacy/ie_util_internal.hpp>
#include <ie_plugin_config.hpp>
#include <ie_icore.hpp>
#include <ie_performance_hints.hpp>
#include "auto_batch.hpp"
namespace AutoBatchPlugin {
@ -52,7 +53,7 @@ AutoBatchInferRequest::AutoBatchInferRequest(const InputsDataMap& networkInput
int batch_id, int num_batch,
bool needPerfCounters)
: IInferRequestInternal(networkInputs, networkOutputs), _workerInferRequest(workerRequestPtr),
_needPerfCounters(needPerfCounters) {
_needPerfCounters(needPerfCounters), _batchId(batch_id), _batchSize(num_batch) {
// Allocate all input blobs
for (const auto &it : networkInputs) {
auto blob = workerRequestPtr->_inferRequest->GetBlob(it.first);
@ -146,22 +147,63 @@ void AutoBatchInferRequest::SetBlobsToAnotherRequest(SoIInferRequestInternal& re
}
}
void AutoBatchInferRequest::CopyInputsIfNeeded() {
for (const auto &it : _networkInputs) {
auto &name = it.first;
// this request is already in BUSY state, so using the internal functions safely
CopyBlobIfNeeded(GetBlob(name), _workerInferRequest->_inferRequest->GetBlob(name), true);
}
}
void AutoBatchInferRequest::CopyBlobIfNeeded(InferenceEngine::Blob::CPtr src, InferenceEngine::Blob::Ptr dst, bool bInput) {
auto bufferDst = dst->buffer();
auto ptrDst = bufferDst.as<char*>();
auto bufferSrc = src->cbuffer();
auto ptrSrc = bufferSrc.as<const char*>();
ptrdiff_t szDst = dst->byteSize();
ptrdiff_t szSrc = src->byteSize();
if (bInput) {
ptrdiff_t offset = szSrc != szDst ? _batchId*szDst/_batchSize : 0;
if (ptrDst - ptrSrc < szDst)
return;
else
memcpy(ptrDst + offset, ptrSrc, src->byteSize());
} else {
ptrdiff_t offset = szSrc != szDst ? _batchId*szSrc/_batchSize : 0;
if (ptrSrc - ptrDst < szDst)
return;
else
memcpy(ptrDst, ptrSrc + offset, dst->byteSize());
}
// std::cout << "!!! COPY !!!" << std::endl;
}
void AutoBatchInferRequest::CopyOutputsIfNeeded() {
for (const auto &it : _networkOutputs) {
auto &name = it.first;
// this request is already in BUSY state, so using the internal functions safely
CopyBlobIfNeeded(_workerInferRequest->_inferRequest->GetBlob(name), GetBlob(name), false);
}
}
std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> AutoBatchInferRequest::GetPerformanceCounts() const {
return _perfMap;
}
void AutoBatchInferRequest::InferImpl() {
auto numReady = ++_workerInferRequest->_numRequestsReady;
// printf("!!! numReady: %d \n", numReady);
if (numReady == _workerInferRequest->_batchSize) {
_workerInferRequest->_numRequestsReady = 0;
_workerInferRequest->_inferRequest->StartAsync();
std::unique_lock<std::mutex> lock(_workerInferRequest->_mutex);
std::unique_lock<std::mutex> lock(_workerInferRequest->_mutex);
int sz = _workerInferRequest->_tasks.unsafe_size();
if (sz == _workerInferRequest->_batchSize) {
printf("!!! BATCH : %ld \n", _workerInferRequest->_tasks.unsafe_size());
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
for (int c = 0; c < _workerInferRequest->_batchSize; c++) {
_workerInferRequest->_tasks.try_pop(t);
_workerInferRequest->_completionTasks[c]= std::move(t.second);
if (_workerInferRequest->_tasks.try_pop(t)) {
_workerInferRequest->_completionTasks[c] = std::move(t.second);
t.first->_inferRequest->CopyInputsIfNeeded();
} else {
printf("!!! BUG !!! \n");
}
}
_workerInferRequest->_inferRequest->StartAsync();
}
}
@ -189,6 +231,7 @@ AutoBatchAsyncInferRequest::AutoBatchAsyncInferRequest(
_pipeline = {
{ /*TaskExecutor*/ std::make_shared<ThisRequestExecutor>(this), /*task*/ [this, needPerfCounters] {
// TODO: exception checking
this->_inferRequest->CopyOutputsIfNeeded();
// if (needPerfCounters)
// _inferRequest->_perfMap = _inferRequest->_workerInferRequest->_inferRequest->GetPerformanceCounts();
}}
@ -259,13 +302,15 @@ InferenceEngine::IInferRequestInternal::Ptr AutoBatchExecutableNetwork::CreateIn
workerRequestPtr->_thread = std::thread([workerRequestPtr, this] {
while (!_terminate) {
std::unique_lock<std::mutex> lock(workerRequestPtr->_mutex);
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(1));
auto status = workerRequestPtr->_cond.wait_for(lock, std::chrono::milliseconds(100));
if (!_terminate && status == std::cv_status::timeout) {
// timeout to collect the batch is over, have to execute the requests in the batch1 mode
std::cout << "TIME_OUT with tasks: " << workerRequestPtr->_tasks.unsafe_size() << std::endl;
auto sz = workerRequestPtr->_tasks.unsafe_size();
IE_ASSERT(sz < (size_t)_device.batchForDevice);
if (sz)
std::cout << "TIME_OUT with tasks: " << sz << std::endl;
std::pair<AutoBatchAsyncInferRequest*, InferenceEngine::Task> t;
// popping all tasks and execute with batch1
IE_ASSERT(workerRequestPtr->_tasks.unsafe_size() < (size_t)_device.batchForDevice);
while (workerRequestPtr->_tasks.try_pop(t)) {
t.first->_inferRequestWithoutBatch->SetCallback([t](std::exception_ptr){t.second();});
t.first->_inferRequest->SetBlobsToAnotherRequest(t.first->_inferRequestWithoutBatch);
@ -306,18 +351,18 @@ InferenceEngine::Parameter AutoBatchExecutableNetwork::GetConfig(const std::stri
InferenceEngine::Parameter AutoBatchExecutableNetwork::GetMetric(const std::string &name) const {
if (name == METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS)) {
unsigned int res = 0u;
auto reqs = 0;
try {
res = _network->GetMetric(METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS)).as<unsigned int>();
auto hint = _network->GetConfig(CONFIG_KEY(PERFORMANCE_HINT_NUM_REQUESTS)).as<std::string>();
reqs = InferenceEngine::PerfHintsConfig::CheckPerformanceHintRequestValue(hint);
if (!reqs) // no limitations from user, let's deduce the full blown #requests
// (multiplied by the devices capabilities to run multiple <batched> requests for further perf)
reqs = _device.batchForDevice *
_network->GetMetric(METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS)).as<unsigned int>();
} catch (const InferenceEngine::Exception &iie) {
IE_THROW()
<< "Every device used with the Auto-Batching should "
<< "support OPTIMAL_NUMBER_OF_INFER_REQUESTS ExecutableNetwork metric. "
<< "Failed to query the metric for the "
<< _network->GetMetric(METRIC_KEY(FULL_DEVICE_NAME)).as<std::string>()
<< " with error:" << iie.what();
}
IE_SET_METRIC_RETURN(OPTIMAL_NUMBER_OF_INFER_REQUESTS, res * _device.batchForDevice);
reqs = std::max(reqs, _device.batchForDevice); // round up to the possible user's value
IE_SET_METRIC_RETURN(OPTIMAL_NUMBER_OF_INFER_REQUESTS, reqs);
} else if (name == METRIC_KEY(NETWORK_NAME)) {
IE_SET_METRIC_RETURN(NETWORK_NAME, _network->GetMetric(
METRIC_KEY(NETWORK_NAME)).as<std::string>());
@ -398,11 +443,6 @@ DeviceInformation AutoBatchInferencePlugin::ParseMetaDevice(const std::string& d
// create meta device
auto cfg = getDeviceConfig(deviceName);
// std::vector<std::string> supportedConfigKeys = GetCore()->GetMetric(deviceName, METRIC_KEY(SUPPORTED_CONFIG_KEYS));
// if (std::find(std::begin(supportedConfigKeys), std::end(supportedConfigKeys), CONFIG_KEY_INTERNAL(AGGREGATED_PLUGIN))
// != std::end(supportedConfigKeys)) {
// cfg.emplace(CONFIG_KEY_INTERNAL(AGGREGATED_PLUGIN), "");
// }
metaDevice = { deviceName, cfg, batch };
}
@ -445,11 +485,9 @@ InferenceEngine::Parameter AutoBatchInferencePlugin::GetMetric(const std::string
metrics.push_back(METRIC_KEY(SUPPORTED_CONFIG_KEYS));
IE_SET_METRIC_RETURN(SUPPORTED_METRICS, metrics);
} else if (name == METRIC_KEY(FULL_DEVICE_NAME)) {
IE_SET_METRIC_RETURN(FULL_DEVICE_NAME, "BATCH");
IE_SET_METRIC_RETURN(FULL_DEVICE_NAME, _pluginName);
} else if (name == METRIC_KEY(SUPPORTED_CONFIG_KEYS)) {
std::vector<std::string> configKeys = {};
// std::vector<std::string> configKeys = {
// CONFIG_KEY_INTERNAL(AGGREGATED_PLUGIN)};
std::vector<std::string> configKeys = PerfHintsConfig::SupportedKeys();
IE_SET_METRIC_RETURN(SUPPORTED_CONFIG_KEYS, configKeys);
} else {
IE_THROW(NotFound) << "Unsupported metric key " << name;

View File

@ -122,10 +122,15 @@ public:
// Batch-Device impl specific: sets the data (blobs from the device request to the batched device request)
void SetBlobsToAnotherRequest(InferenceEngine::SoIInferRequestInternal& req);
void CopyInputsIfNeeded();
void CopyOutputsIfNeeded();
AutoBatchExecutableNetwork::WorkerInferRequest* _workerInferRequest;
protected:
std::map<std::string, InferenceEngine::InferenceEngineProfileInfo> _perfMap;
bool _needPerfCounters = false;
void CopyBlobIfNeeded(InferenceEngine::Blob::CPtr src, InferenceEngine::Blob::Ptr dst, bool bInput);
size_t _batchId;
size_t _batchSize;
};
class AutoBatchAsyncInferRequest : public InferenceEngine::AsyncInferRequestThreadSafeDefault {