resubmit PR#17006 (#17137)
* clean up multi code path Signed-off-by: fishbell <bell.song@intel.com> * clang Signed-off-by: fishbell <bell.song@intel.com> * potential locking issue Signed-off-by: fishbell <bell.song@intel.com> * remove unecessary variable Signed-off-by: fishbell <bell.song@intel.com> * clear redundunt return syntax Signed-off-by: fishbell <bell.song@intel.com> * WR build issue on buntu 2004 Signed-off-by: fishbell <bell.song@intel.com> --------- Signed-off-by: fishbell <bell.song@intel.com>
This commit is contained in:
parent
2c450ced24
commit
fed06fcb91
@ -236,7 +236,7 @@ IE::Parameter AutoExecutableNetwork::GetMetric(const std::string& name) const {
|
||||
};
|
||||
if (_autoSchedule->_pCTPUTLoadContext) {
|
||||
std::vector<std::string> exeDevices = {};
|
||||
std::lock_guard<std::mutex> lock(_autoSContext->_confMutex);
|
||||
std::lock_guard<std::mutex> lock(_autoSContext->_fallbackMutex);
|
||||
for (auto const & n : _autoSContext->_devicePriorities) {
|
||||
exeDevices.push_back(n.deviceName);
|
||||
}
|
||||
@ -258,7 +258,12 @@ IE::Parameter AutoExecutableNetwork::GetMetric(const std::string& name) const {
|
||||
} else if (name == ov::model_name) {
|
||||
std::lock_guard<std::mutex> lock(_autoSContext->_confMutex);
|
||||
if (_autoSchedule->_pCTPUTLoadContext) {
|
||||
return _autoSchedule->_pCTPUTLoadContext[0].executableNetwork->GetMetric(name);
|
||||
for (size_t i = 0; i < _autoSchedule->_nCTputDeviceNums; i++) {
|
||||
if (_autoSchedule->_pCTPUTLoadContext[i].isAlready) {
|
||||
return _autoSchedule->_pCTPUTLoadContext[i].executableNetwork->GetMetric(name);
|
||||
}
|
||||
}
|
||||
IE_THROW() << "No valid executable network found to get" << name;
|
||||
} else {
|
||||
if (_autoSchedule->_loadContext[CPU].isEnabled && _autoSchedule->_loadContext[CPU].isAlready)
|
||||
return _autoSchedule->_loadContext[CPU].executableNetwork->GetMetric(name);
|
||||
|
@ -37,6 +37,105 @@ std::string GetNetworkPrecision(const IE::CNNNetwork& network) {
|
||||
}
|
||||
} // namespace
|
||||
|
||||
thread_local WorkerInferRequest* AutoSchedule::_thisWorkerInferRequest = nullptr;
|
||||
// TODO: revert to the plain variable (see header file), when we moved to the next CentOS 8.x in our support matrix
|
||||
thread_local const char* AutoSchedule::_thisPreferredDeviceName = "";
|
||||
|
||||
Pipeline AutoSchedule::GetPipeline(const IInferPtr& syncInferRequest, WorkerInferRequest** workerInferRequest) {
|
||||
Pipeline pipeline;
|
||||
if (_passthroughExeNet) {
|
||||
struct RequestExecutor : ITaskExecutor {
|
||||
explicit RequestExecutor(InferenceEngine::SoIInferRequestInternal& inferRequest) : _inferRequest(inferRequest) {
|
||||
_inferRequest->SetCallback([this](std::exception_ptr exceptionPtr) mutable {
|
||||
_exceptionPtr = exceptionPtr;
|
||||
auto capturedTask = std::move(_task);
|
||||
capturedTask();
|
||||
});
|
||||
}
|
||||
void run(InferenceEngine::Task task) override {
|
||||
_task = std::move(task);
|
||||
_inferRequest->StartAsync();
|
||||
};
|
||||
InferenceEngine::SoIInferRequestInternal& _inferRequest;
|
||||
std::exception_ptr _exceptionPtr;
|
||||
InferenceEngine::Task _task;
|
||||
};
|
||||
auto requestExecutor =
|
||||
std::make_shared<RequestExecutor>(std::static_pointer_cast<MultiDeviceInferRequest>(syncInferRequest)->GetSharedRequest());
|
||||
pipeline.emplace_back(requestExecutor, [requestExecutor] {
|
||||
if (nullptr != requestExecutor->_exceptionPtr) {
|
||||
std::rethrow_exception(requestExecutor->_exceptionPtr);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
MultiImmediateExecutor::Ptr _firstExecutor = std::make_shared<MultiImmediateExecutor>();
|
||||
pipeline = {
|
||||
// if the request is coming with device-specific remote blobs make sure it is scheduled to the specific device only:
|
||||
Stage {
|
||||
/*TaskExecutor*/ _firstExecutor, /*task*/ [this, &syncInferRequest]() {
|
||||
// by default, no preferred device:
|
||||
_thisPreferredDeviceName = "";
|
||||
auto execNetwork = _autoSContext->_executableNetwork.lock();
|
||||
// if any input is remote (e.g. was set with SetBlob), let' use the corresponding device
|
||||
for (const auto& it : execNetwork->GetInputsInfo()) {
|
||||
auto b = syncInferRequest->GetBlob(it.first);
|
||||
auto r = b->as<IE::RemoteBlob>();
|
||||
if (r) {
|
||||
const auto name = r->getDeviceName();
|
||||
const auto res = std::find_if(
|
||||
_autoSContext->_devicePrioritiesInitial.cbegin(),
|
||||
_autoSContext->_devicePrioritiesInitial.cend(),
|
||||
[&name](const MultiDevicePlugin::DeviceInformation & d) {
|
||||
return (d.defaultDeviceID.empty() ? d.deviceName : (d.deviceName + "." +
|
||||
d.defaultDeviceID)) == name;
|
||||
});
|
||||
if (_autoSContext->_devicePrioritiesInitial.cend() == res) {
|
||||
IE_THROW() <<
|
||||
"None of the devices (for which current MULTI-device configuration was "
|
||||
"initialized) supports a remote blob created on the device named " << name;
|
||||
} else {
|
||||
// it is ok to take the c_str() here (as pointed in the executable_network.hpp we need to use const char*)
|
||||
// as the original strings are from the "persistent" vector (with the right lifetime)
|
||||
_thisPreferredDeviceName = res->deviceName.c_str();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}},
|
||||
// as the scheduling algo may select any device, this stage accepts the scheduling decision (actual workerRequest)
|
||||
// then sets the device-agnostic blobs to the actual (device-specific) request
|
||||
Stage {
|
||||
/*TaskExecutor*/std::dynamic_pointer_cast<IE::ITaskExecutor>(shared_from_this()), /*task*/ [&syncInferRequest, workerInferRequest]() {
|
||||
*workerInferRequest = _thisWorkerInferRequest;
|
||||
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>(syncInferRequest);
|
||||
multiSyncInferRequest->SetBlobsToAnotherRequest(_thisWorkerInferRequest->_inferRequest);
|
||||
INFO_RUN([workerInferRequest]() {
|
||||
(*workerInferRequest)->_startTimes.push_back(std::chrono::steady_clock::now());
|
||||
});
|
||||
}},
|
||||
// final task in the pipeline:
|
||||
Stage {
|
||||
/*TaskExecutor*/std::make_shared<ThisRequestExecutor>(workerInferRequest, _firstExecutor), /*task*/
|
||||
[this, &syncInferRequest, workerInferRequest]() {
|
||||
INFO_RUN([workerInferRequest]() {
|
||||
(*workerInferRequest)->_endTimes.push_back(std::chrono::steady_clock::now());
|
||||
});
|
||||
std::exception_ptr eptr = (*workerInferRequest)->_exceptionPtr;
|
||||
if (nullptr != eptr) {
|
||||
std::rethrow_exception(eptr);
|
||||
}
|
||||
if (_autoSContext->_needPerfCounters) {
|
||||
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>
|
||||
(syncInferRequest);
|
||||
multiSyncInferRequest->_scheduledRequest =
|
||||
(*workerInferRequest)->_inferRequest;
|
||||
}
|
||||
}}
|
||||
};
|
||||
}
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
void AutoSchedule::GenerateWorkers(const std::string& device,
|
||||
const SoExecNetwork& executableNetwork) {
|
||||
std::string realDeviceName;
|
||||
@ -203,7 +302,6 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
|
||||
LOG_INFO_TAG("ExecutableNetwork start");
|
||||
// initialize cpuHelpReleasetime
|
||||
_cpuHelpReleaseTime = std::chrono::steady_clock::now();
|
||||
_multiSContext = std::dynamic_pointer_cast<MultiScheduleContext>(sContext);
|
||||
_autoSContext = std::dynamic_pointer_cast<AutoScheduleContext>(sContext);
|
||||
if (_autoSContext->_core == nullptr) {
|
||||
IE_THROW() << "Please, work with Auto device via InferencEngine::Core object";
|
||||
@ -307,7 +405,7 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
|
||||
// Handle device load failure in case of ctput
|
||||
if (isCumulative && !contextPtr->isLoadSuccess) {
|
||||
std::string failedDeviceName = contextPtr->deviceInfo.deviceName;
|
||||
std::lock_guard<std::mutex> lock(_autoSContext->_confMutex);
|
||||
std::lock_guard<std::mutex> lock(_autoSContext->_fallbackMutex);
|
||||
const auto DeviceIter = deviceChecker().checkAndReturnIfDeviceInList(failedDeviceName, _autoSContext->_devicePriorities);
|
||||
// Remove failed device from _devicePriorities
|
||||
if (DeviceIter != _autoSContext->_devicePriorities.end()) {
|
||||
@ -681,9 +779,7 @@ bool AutoSchedule::ScheduleToWorkerInferRequest(IE::Task inferPipelineTask, Devi
|
||||
}
|
||||
} else {
|
||||
if (_pCTPUTLoadContext) {
|
||||
for (size_t i = 0; i < _autoSContext->_devicePriorities.size(); i++) {
|
||||
devices.push_back(_autoSContext->_devicePriorities[i]);
|
||||
}
|
||||
devices = _autoSContext->_devicePriorities;
|
||||
} else {
|
||||
// _acceleratorDevice could be the same as _cpuDevice, such as AUTO:CPU
|
||||
if (_loadContext[FALLBACKDEVICE].isAlready) {
|
||||
@ -738,6 +834,10 @@ bool AutoSchedule::RunPipelineTask(IE::Task& inferPipelineTask,
|
||||
return false;
|
||||
}
|
||||
|
||||
void AutoSchedule::run(IE::Task inferPipelineTask) {
|
||||
ScheduleToWorkerInferRequest(std::move(inferPipelineTask), _thisPreferredDeviceName);
|
||||
}
|
||||
|
||||
AutoSchedule::~AutoSchedule() {
|
||||
// this is necessary to guarantee member destroyed after getting future
|
||||
if (_loadContext[CPU].isEnabled) {
|
||||
@ -750,15 +850,92 @@ AutoSchedule::~AutoSchedule() {
|
||||
}
|
||||
_autoSContext->_plugin->UnregisterPriority(_autoSContext->_modelPriority,
|
||||
_loadContext[ACTUALDEVICE].deviceInfo.uniqueName);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_autoSContext->_fallbackMutex);
|
||||
_autoSContext->_devicePriorities.clear();
|
||||
}
|
||||
/* NOTE: The only threads that use `MultiSchedule` worker infer requests' threads.
|
||||
* But AsyncInferRequest destructor should wait for all asynchronous tasks by the request
|
||||
*/
|
||||
for (auto&& idleWorker : _idleWorkerRequests) {
|
||||
// stop accepting any idle requests back (for re-scheduling)
|
||||
idleWorker.second.set_capacity(0);
|
||||
}
|
||||
INFO_RUN([this] {
|
||||
for (auto&& _workerRequest : _workerRequests) {
|
||||
std::list<Time> reqAllStartTimes;
|
||||
std::list<Time> reqAllEndTimes;
|
||||
for (auto& request : _workerRequest.second) {
|
||||
reqAllStartTimes.splice(reqAllStartTimes.end(), request._startTimes);
|
||||
reqAllEndTimes.splice(reqAllEndTimes.end(), request._endTimes);
|
||||
}
|
||||
size_t count = reqAllStartTimes.size();
|
||||
IE_ASSERT(count == reqAllEndTimes.size());
|
||||
reqAllStartTimes.sort(std::less<Time>());
|
||||
reqAllEndTimes.sort(std::less<Time>());
|
||||
if (_workerRequest.first == "CPU_HELP") {
|
||||
LOG_INFO_TAG("CPU_HELP:infer:%ld", _cpuHelpInferCount + count);
|
||||
if (_cpuHelpFps > 0.0) {
|
||||
LOG_INFO_TAG("CPU_HELP:fps:%lf", _cpuHelpFps);
|
||||
} else if (count >= 1) {
|
||||
std::chrono::duration<double, std::milli> durtation =
|
||||
reqAllEndTimes.back() - reqAllStartTimes.front();
|
||||
LOG_INFO_TAG("CPU_HELP:fps:%lf", count * 1000 / durtation.count());
|
||||
}
|
||||
} else {
|
||||
LOG_INFO_TAG("%s:infer:%ld", _workerRequest.first.c_str(), count);
|
||||
auto n = reqAllStartTimes.size();
|
||||
Time time;
|
||||
while (!reqAllStartTimes.empty()) {
|
||||
time = reqAllStartTimes.front();
|
||||
if (time < _cpuHelpReleaseTime) {
|
||||
reqAllStartTimes.pop_front();
|
||||
n--;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (n >= 1) {
|
||||
std::chrono::duration<double, std::milli> durtation =
|
||||
reqAllEndTimes.back() - time;
|
||||
LOG_INFO_TAG("%s:fps:%lf", _workerRequest.first.c_str(),
|
||||
n * 1000 / durtation.count());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
_workerRequests.clear();
|
||||
LOG_INFO_TAG("ExecutableNetwork end");
|
||||
}
|
||||
|
||||
IInferPtr AutoSchedule::CreateInferRequestImpl(
|
||||
const std::vector<std::shared_ptr<const ov::Node>>& inputs,
|
||||
const std::vector<std::shared_ptr<const ov::Node>>& outputs) {
|
||||
SoInfer request_to_share_blobs_with;
|
||||
IE::RemoteContext::Ptr ctx = nullptr;
|
||||
if (_passthroughExeNet)
|
||||
request_to_share_blobs_with = {_passthroughExeNet->CreateInferRequest(), _passthroughExeNet._so};
|
||||
return std::make_shared<MultiDeviceInferRequest>(inputs, outputs, request_to_share_blobs_with);
|
||||
}
|
||||
|
||||
IInferPtr AutoSchedule::CreateInferRequestImpl(IE::InputsDataMap networkInputs,
|
||||
IE::OutputsDataMap networkOutputs) {
|
||||
SoInfer request_to_share_blobs_with;
|
||||
IE::RemoteContext::Ptr ctx = nullptr;
|
||||
if (_passthroughExeNet)
|
||||
request_to_share_blobs_with = {_passthroughExeNet->CreateInferRequest(), _passthroughExeNet._so};
|
||||
return std::make_shared<MultiDeviceInferRequest>(networkInputs, networkOutputs, request_to_share_blobs_with);
|
||||
}
|
||||
|
||||
std::string AutoSchedule::GetLogTag() const noexcept {
|
||||
return _LogTag;
|
||||
}
|
||||
|
||||
IInferPtr AutoSchedule::CreateInferRequest() {
|
||||
auto execNetwork = std::dynamic_pointer_cast<AutoExecutableNetwork>(
|
||||
_autoSContext->_executableNetwork.lock());
|
||||
IInferPtr syncRequestImpl;
|
||||
if (_multiSContext->_core && _multiSContext->_core->isNewAPI())
|
||||
if (_autoSContext->_core && _autoSContext->_core->isNewAPI())
|
||||
syncRequestImpl = CreateInferRequestImpl(execNetwork->_parameters, execNetwork->_results);
|
||||
if (!syncRequestImpl)
|
||||
syncRequestImpl = CreateInferRequestImpl(execNetwork->_networkInputs, execNetwork->_networkOutputs);
|
||||
|
@ -5,7 +5,7 @@
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#pragma once
|
||||
|
||||
#include "multi_schedule.hpp"
|
||||
#include "schedule.hpp"
|
||||
|
||||
#ifdef MULTIUNITTEST
|
||||
#define MOCKTESTMACRO virtual
|
||||
@ -14,6 +14,16 @@
|
||||
#define MOCKTESTMACRO
|
||||
#endif
|
||||
namespace MultiDevicePlugin {
|
||||
struct ThisRequestExecutor : public IE::ITaskExecutor {
|
||||
explicit ThisRequestExecutor(WorkerInferRequest** ptr, MultiImmediateExecutor::Ptr executor = nullptr): _workptrptr{ptr}, _fallbackExec(executor) {}
|
||||
void run(IE::Task task) override {
|
||||
(*_workptrptr)->_task = std::move(task);
|
||||
(*_workptrptr)->_fallbackExec = _fallbackExec;
|
||||
(*_workptrptr)->_inferRequest->StartAsync();
|
||||
};
|
||||
WorkerInferRequest** _workptrptr = nullptr;
|
||||
MultiImmediateExecutor::Ptr _fallbackExec;
|
||||
};
|
||||
struct AutoLoadContext {
|
||||
std::atomic<bool> isEnabled = {false};
|
||||
std::atomic<bool> isAlready = {false};
|
||||
@ -40,26 +50,39 @@ enum AutoLoadContextIndex {
|
||||
FALLBACKDEVICE = 2,
|
||||
CONTEXTNUM = 3
|
||||
};
|
||||
class AutoSchedule : public MultiSchedule {
|
||||
class AutoSchedule : public Schedule, public IE::ITaskExecutor {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<AutoSchedule>;
|
||||
void init(const ScheduleContext::Ptr& sContext) override;
|
||||
IInferPtr CreateInferRequest() override;
|
||||
IInferPtr CreateInferRequestImpl(IE::InputsDataMap networkInputs, IE::OutputsDataMap networkOutputs) override;
|
||||
IInferPtr CreateInferRequestImpl(const std::vector<std::shared_ptr<const ov::Node>>& inputs,
|
||||
const std::vector<std::shared_ptr<const ov::Node>>& outputs) override;
|
||||
void run(IE::Task inferTask) override;
|
||||
Pipeline GetPipeline(const IInferPtr& syncRequestImpl, WorkerInferRequest** WorkerInferRequest) override;
|
||||
void WaitActualNetworkReady() const;
|
||||
virtual ~AutoSchedule();
|
||||
|
||||
public:
|
||||
static thread_local WorkerInferRequest* _thisWorkerInferRequest;
|
||||
// have to use the const char* ptr rather than std::string due to a bug in old gcc versions,
|
||||
// the bug is e.g. manifesting on the old CentOS (and it's 4.8.x gcc) used in our testing
|
||||
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81880
|
||||
static thread_local const char* _thisPreferredDeviceName;
|
||||
AutoLoadContext _loadContext[CONTEXTNUM];
|
||||
std::unique_ptr<AutoLoadContext[]> _pCTPUTLoadContext = nullptr;
|
||||
size_t _nCTputDeviceNums = 0;
|
||||
|
||||
protected:
|
||||
void GenerateWorkers(const std::string& device, const SoExecNetwork& executableNetwork) override;
|
||||
bool ScheduleToWorkerInferRequest(IE::Task, DeviceName preferred_device = "") override;
|
||||
void GenerateWorkers(const std::string& device, const SoExecNetwork& executableNetwork);
|
||||
bool ScheduleToWorkerInferRequest(IE::Task, DeviceName preferred_device = "");
|
||||
static bool RunPipelineTask(IE::Task& inferPipelineTask, NotBusyPriorityWorkerRequests& idleWorkerRequests,
|
||||
const DeviceName& preferred_device);
|
||||
DeviceMap<NotBusyPriorityWorkerRequests> _idleWorkerRequests;
|
||||
AutoScheduleContext::Ptr _autoSContext;
|
||||
std::string GetLogTag() const noexcept;
|
||||
DeviceMap<NotBusyPriorityWorkerRequests> _idleWorkerRequests;
|
||||
AutoScheduleContext::Ptr _autoSContext;
|
||||
std::atomic_size_t _numRequestsCreated = {0};
|
||||
DeviceMap<std::vector<WorkerInferRequest>> _workerRequests;
|
||||
|
||||
private:
|
||||
/**
|
||||
@ -73,12 +96,19 @@ private:
|
||||
IE::Task releaseActualdeviceTask;
|
||||
|
||||
private:
|
||||
IE::IStreamsExecutor::Ptr _executor;
|
||||
mutable std::once_flag _oc;
|
||||
std::once_flag _firstLoadOC;
|
||||
std::future<void> _firstLoadFuture;
|
||||
std::promise<void> _firstLoadPromise;
|
||||
bool _exitFlag = {false};
|
||||
IE::ThreadSafeQueue<IE::Task> _inferPipelineTasks;
|
||||
DeviceMap<std::unique_ptr<IE::ThreadSafeQueue<IE::Task>>> _inferPipelineTasksDeviceSpecific;
|
||||
SoExecNetwork _passthroughExeNet;
|
||||
Time _cpuHelpReleaseTime;
|
||||
size_t _cpuHelpInferCount = 0;
|
||||
double _cpuHelpFps = 0.0;
|
||||
std::string _LogTag;
|
||||
IE::IStreamsExecutor::Ptr _executor;
|
||||
mutable std::once_flag _oc;
|
||||
std::once_flag _firstLoadOC;
|
||||
std::future<void> _firstLoadFuture;
|
||||
std::promise<void> _firstLoadPromise;
|
||||
bool _exitFlag = {false};
|
||||
};
|
||||
|
||||
} // namespace MultiDevicePlugin
|
||||
|
@ -59,7 +59,7 @@ public:
|
||||
_task = std::move(task);
|
||||
_task();
|
||||
}
|
||||
InferenceEngine::Task _task;
|
||||
IE::Task _task;
|
||||
};
|
||||
|
||||
struct DeviceInformation {
|
||||
@ -191,35 +191,26 @@ public:
|
||||
virtual ~ScheduleContext() = default;
|
||||
};
|
||||
|
||||
class MultiScheduleContext : public ScheduleContext {
|
||||
class MultiDeviceInferencePlugin;
|
||||
class AutoScheduleContext : public ScheduleContext {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<MultiScheduleContext>;
|
||||
using Ptr = std::shared_ptr<AutoScheduleContext>;
|
||||
std::vector<DeviceInformation> _devicePriorities;
|
||||
std::vector<DeviceInformation> _devicePrioritiesInitial;
|
||||
std::unordered_map<std::string, IE::Parameter> _config;
|
||||
DeviceMap<SoExecNetwork> _networksPerDevice;
|
||||
std::mutex _mutex;
|
||||
bool _needPerfCounters;
|
||||
bool _batchingDisabled = {false};
|
||||
bool _startupfallback = true;
|
||||
bool _runtimeFallback = true;
|
||||
virtual ~MultiScheduleContext() = default;
|
||||
};
|
||||
|
||||
class MultiDeviceInferencePlugin;
|
||||
class AutoScheduleContext : public MultiScheduleContext {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<AutoScheduleContext>;
|
||||
std::string _modelPath;
|
||||
IE::CNNNetwork _network;
|
||||
std::string _strDevices;
|
||||
unsigned int _modelPriority = 0;
|
||||
std::string _performanceHint;
|
||||
std::mutex _confMutex;
|
||||
std::mutex _fallbackMutex;
|
||||
MultiDeviceInferencePlugin* _plugin;
|
||||
SoExecNetwork _hwExecutableNetwork;
|
||||
std::string _modelPath;
|
||||
IE::CNNNetwork _network;
|
||||
std::string _strDevices;
|
||||
unsigned int _modelPriority = 0;
|
||||
std::string _performanceHint;
|
||||
std::mutex _confMutex;
|
||||
std::mutex _fallbackMutex;
|
||||
MultiDeviceInferencePlugin* _plugin;
|
||||
SoExecNetwork _hwExecutableNetwork;
|
||||
virtual ~AutoScheduleContext() = default;
|
||||
};
|
||||
|
||||
} // namespace MultiDevicePlugin
|
||||
|
@ -1,169 +0,0 @@
|
||||
// Copyright (C) 2018-2023 Intel Corporation
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#include "multi_executable_network.hpp"
|
||||
#include "plugin.hpp"
|
||||
// ------------------------------MultiExecutableNetwork----------------------------
|
||||
namespace MultiDevicePlugin {
|
||||
MultiExecutableNetwork::MultiExecutableNetwork(MultiScheduleContext::Ptr& context, const MultiSchedule::Ptr& schedule)
|
||||
: ExecutableNetwork(schedule, context),
|
||||
_multiSContext(context) {
|
||||
}
|
||||
|
||||
MultiExecutableNetwork::~MultiExecutableNetwork() {}
|
||||
std::shared_ptr<IE::RemoteContext> MultiExecutableNetwork::GetContext() const {
|
||||
auto devices = [&] {
|
||||
std::lock_guard<std::mutex> lock(_multiSContext->_mutex);
|
||||
return _multiSContext->_devicePriorities;
|
||||
}();
|
||||
std::string devices_names;
|
||||
for (auto&& device : devices) {
|
||||
devices_names += device.deviceName + " ";
|
||||
const auto& n = _multiSContext->_networksPerDevice.at(device.deviceName);
|
||||
try {
|
||||
return n->GetContext();
|
||||
} catch (const IE::NotImplemented&) {}
|
||||
}
|
||||
IE_THROW(NotImplemented) <<
|
||||
"None of the devices in the MULTI device has an associated remote context."
|
||||
<< " Current list of devices allowed via the DEVICE_PRIORITIES config: " <<
|
||||
devices_names;
|
||||
}
|
||||
|
||||
|
||||
void MultiExecutableNetwork::SetConfig(const
|
||||
std::map<std::string, IE::Parameter>& config) {
|
||||
auto priorities = config.find(ov::device::priorities.name());
|
||||
if (priorities == config.end() || config.size() > 1) {
|
||||
IE_THROW() <<
|
||||
"The only config supported for the Network's SetConfig is MultiDeviceConfigParams::KEY_MULTI_DEVICE_PRIORITIES";
|
||||
} else {
|
||||
auto multiPlugin = std::dynamic_pointer_cast<MultiDeviceInferencePlugin>
|
||||
(this->_plugin);
|
||||
assert(multiPlugin != nullptr);
|
||||
auto metaDevices = multiPlugin->ParseMetaDevices(
|
||||
priorities->second.as<std::string>(), {});
|
||||
if (std::any_of(metaDevices.begin(),
|
||||
metaDevices.end(), [](const DeviceInformation & kvp) {
|
||||
return kvp.numRequestsPerDevices != -1;
|
||||
})) {
|
||||
IE_THROW() << "You can only change device priorities but not number of requests"
|
||||
<< " with the Network's SetConfig(MultiDeviceConfigParams::KEY_MULTI_DEVICE_PRIORITIES!";
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{_multiSContext->_mutex};
|
||||
for (auto&& device : metaDevices) {
|
||||
if (_multiSContext->_networksPerDevice.find(device.deviceName) ==
|
||||
_multiSContext->_networksPerDevice.end()) {
|
||||
IE_THROW(NotFound) <<
|
||||
"You can only change device priorities but not add new devices with"
|
||||
<< " the Network's SetConfig(MultiDeviceConfigParams::KEY_MULTI_DEVICE_PRIORITIES. "
|
||||
<< device.deviceName << " device was not in the original device list!";
|
||||
}
|
||||
}
|
||||
_multiSContext->_devicePriorities = metaDevices;
|
||||
// update value in config
|
||||
_multiSContext->_config[IE::MultiDeviceConfigParams::KEY_MULTI_DEVICE_PRIORITIES]
|
||||
= priorities->second;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
IE::Parameter MultiExecutableNetwork::GetConfig(const std::string& name) const {
|
||||
{
|
||||
auto it = _multiSContext->_config.find(name);
|
||||
if (it != _multiSContext->_config.end()) {
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
// find config key among networks config keys
|
||||
for (const auto& desc : _multiSContext->_networksPerDevice) {
|
||||
const auto& execNetwork = desc.second;
|
||||
auto param = execNetwork->GetMetric(METRIC_KEY(SUPPORTED_CONFIG_KEYS));
|
||||
for (auto&& configKey : param.as<std::vector<std::string>>()) {
|
||||
if (configKey == name) {
|
||||
return execNetwork->GetConfig(configKey);
|
||||
}
|
||||
}
|
||||
IE_THROW() << "Unsupported ExecutableNetwork config key: " << name;
|
||||
}
|
||||
IE_THROW(NotFound) << name << " not found in the ExecutableNetwork config";
|
||||
}
|
||||
|
||||
IE::Parameter MultiExecutableNetwork::GetMetric(const std::string& name) const {
|
||||
if (name == ov::supported_properties) {
|
||||
return decltype(ov::supported_properties)::value_type {
|
||||
// Metrics
|
||||
ov::PropertyName{ov::supported_properties.name(), ov::PropertyMutability::RO},
|
||||
ov::PropertyName{ov::model_name.name(), ov::PropertyMutability::RO},
|
||||
ov::PropertyName{ov::optimal_number_of_infer_requests.name(), ov::PropertyMutability::RO},
|
||||
|
||||
// Configs
|
||||
// device priority can be changed on-the-fly in MULTI
|
||||
ov::PropertyName{ov::device::priorities.name(), ov::PropertyMutability::RW},
|
||||
ov::PropertyName{ov::device::properties.name(), ov::PropertyMutability::RO},
|
||||
ov::PropertyName{ov::execution_devices.name(), ov::PropertyMutability::RO}
|
||||
};
|
||||
} else if (name == ov::device::properties) {
|
||||
ov::AnyMap all_devices = {};
|
||||
for (auto const & network : _multiSContext->_networksPerDevice) {
|
||||
ov::AnyMap device_properties = {};
|
||||
auto device_supported_metrics = network.second->GetMetric(METRIC_KEY(SUPPORTED_METRICS));
|
||||
for (auto&& property_name : device_supported_metrics.as<std::vector<std::string>>()) {
|
||||
device_properties[property_name] = network.second->GetMetric(property_name);;
|
||||
}
|
||||
auto device_supported_configs = network.second->GetMetric(METRIC_KEY(SUPPORTED_CONFIG_KEYS));
|
||||
for (auto&& property_name : device_supported_configs.as<std::vector<std::string>>()) {
|
||||
device_properties[property_name] = network.second->GetConfig(property_name);
|
||||
}
|
||||
all_devices[network.first] = device_properties;
|
||||
}
|
||||
return all_devices;
|
||||
} else if (name == ov::optimal_number_of_infer_requests) {
|
||||
unsigned int res = 0u;
|
||||
for (auto const & n : _multiSContext->_networksPerDevice) {
|
||||
try {
|
||||
res += n.second->GetMetric(METRIC_KEY(
|
||||
OPTIMAL_NUMBER_OF_INFER_REQUESTS)).as<unsigned int>();
|
||||
} catch (const IE::Exception& iie) {
|
||||
IE_THROW()
|
||||
<< "Every device used with the Multi-Device should "
|
||||
<< "support OPTIMAL_NUMBER_OF_INFER_REQUESTS ExecutableNetwork metric. "
|
||||
<< "Failed to query the metric for the " << n.first << " with error:" <<
|
||||
iie.what();
|
||||
}
|
||||
}
|
||||
return decltype(ov::optimal_number_of_infer_requests)::value_type {res};
|
||||
} else if (name == ov::model_name) {
|
||||
auto it = _multiSContext->_networksPerDevice.begin();
|
||||
IE_ASSERT(it != _multiSContext->_networksPerDevice.end());
|
||||
return decltype(ov::model_name)::value_type {it->second->GetMetric(METRIC_KEY(NETWORK_NAME)).as<std::string>()};
|
||||
} else if (name == METRIC_KEY(SUPPORTED_METRICS)) {
|
||||
IE_SET_METRIC_RETURN(SUPPORTED_METRICS, {
|
||||
METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS),
|
||||
METRIC_KEY(SUPPORTED_METRICS),
|
||||
METRIC_KEY(NETWORK_NAME),
|
||||
METRIC_KEY(SUPPORTED_CONFIG_KEYS)
|
||||
});
|
||||
} else if (name == METRIC_KEY(SUPPORTED_CONFIG_KEYS)) {
|
||||
std::vector<std::string> configKeys = {IE::MultiDeviceConfigParams::KEY_MULTI_DEVICE_PRIORITIES};
|
||||
IE_SET_METRIC_RETURN(SUPPORTED_CONFIG_KEYS, configKeys);
|
||||
} else if (name == ov::execution_devices) {
|
||||
std::vector<std::string> exeDevices = {};
|
||||
std::vector<std::string> exeDevicesUnsort = {};
|
||||
for (const auto & n : _multiSContext->_networksPerDevice) {
|
||||
exeDevicesUnsort.push_back(n.first);
|
||||
}
|
||||
for (const auto & n : _multiSContext->_devicePriorities) {
|
||||
if (std::find(exeDevicesUnsort.begin(), exeDevicesUnsort.end(), n.deviceName) != exeDevicesUnsort.end()) {
|
||||
exeDevices.push_back(n.deviceName);
|
||||
}
|
||||
}
|
||||
return decltype(ov::execution_devices)::value_type {exeDevices};
|
||||
} else {
|
||||
IE_THROW() << "Unsupported ExecutableNetwork metric key: " << name;
|
||||
}
|
||||
}
|
||||
} // namespace MultiDevicePlugin
|
@ -1,36 +0,0 @@
|
||||
// Copyright (C) 2018-2023 Intel Corporation
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#pragma once
|
||||
#include "executable_network.hpp"
|
||||
#include "multi_schedule.hpp"
|
||||
#include "bind_multi_schedule.hpp"
|
||||
|
||||
#ifdef MULTIUNITTEST
|
||||
#define MOCKTESTMACRO virtual
|
||||
#define MultiDevicePlugin MockMultiDevicePlugin
|
||||
#else
|
||||
#define MOCKTESTMACRO
|
||||
#endif
|
||||
|
||||
namespace MultiDevicePlugin {
|
||||
class MultiExecutableNetwork : public ExecutableNetwork {
|
||||
friend IInferPtr MultiSchedule::CreateInferRequest();
|
||||
public:
|
||||
using Ptr = std::shared_ptr<MultiExecutableNetwork>;
|
||||
|
||||
explicit MultiExecutableNetwork(MultiScheduleContext::Ptr& context, const MultiSchedule::Ptr& schedule);
|
||||
|
||||
void SetConfig(const std::map<std::string, IE::Parameter>& config) override;
|
||||
IE::Parameter GetConfig(const std::string& name) const override;
|
||||
IE::Parameter GetMetric(const std::string& name) const override;
|
||||
std::shared_ptr<IE::RemoteContext> GetContext() const override;
|
||||
~MultiExecutableNetwork() override;
|
||||
|
||||
private:
|
||||
MultiScheduleContext::Ptr _multiSContext;
|
||||
};
|
||||
|
||||
} // namespace MultiDevicePlugin
|
@ -1,331 +0,0 @@
|
||||
// Copyright (C) 2018-2023 Intel Corporation
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#include "async_infer_request.hpp"
|
||||
#include "plugin.hpp"
|
||||
#include "multi_schedule.hpp"
|
||||
#include "multi_executable_network.hpp"
|
||||
// ------------------------------MultiSchedule----------------------------
|
||||
namespace MultiDevicePlugin {
|
||||
|
||||
thread_local WorkerInferRequest* MultiSchedule::_thisWorkerInferRequest = nullptr;
|
||||
// TODO: revert to the plain variable (see header file), when we moved to the next CentOS 8.x in our support matrix
|
||||
thread_local const char* MultiSchedule::_thisPreferredDeviceName = "";
|
||||
|
||||
void MultiSchedule::init(const ScheduleContext::Ptr& sContext) {
|
||||
_cpuHelpReleaseTime = std::chrono::steady_clock::now();
|
||||
_LogTag = sContext->_LogTag;
|
||||
_multiSContext = std::dynamic_pointer_cast<MultiScheduleContext>(sContext);
|
||||
for (auto&& networkValue : _multiSContext->_networksPerDevice) {
|
||||
auto& device = networkValue.first;
|
||||
auto& network = networkValue.second;
|
||||
GenerateWorkers(device, network);
|
||||
}
|
||||
if (_multiSContext->_networksPerDevice.size() == 1)
|
||||
_passthroughExeNet = _multiSContext->_networksPerDevice.begin()->second;
|
||||
}
|
||||
|
||||
Pipeline MultiSchedule::GetPipeline(const IInferPtr& syncInferRequest, WorkerInferRequest** workerInferRequest) {
|
||||
Pipeline pipeline;
|
||||
if (_passthroughExeNet) {
|
||||
struct RequestExecutor : ITaskExecutor {
|
||||
explicit RequestExecutor(InferenceEngine::SoIInferRequestInternal& inferRequest) : _inferRequest(inferRequest) {
|
||||
_inferRequest->SetCallback([this](std::exception_ptr exceptionPtr) mutable {
|
||||
_exceptionPtr = exceptionPtr;
|
||||
auto capturedTask = std::move(_task);
|
||||
capturedTask();
|
||||
});
|
||||
}
|
||||
void run(InferenceEngine::Task task) override {
|
||||
_task = std::move(task);
|
||||
_inferRequest->StartAsync();
|
||||
};
|
||||
InferenceEngine::SoIInferRequestInternal& _inferRequest;
|
||||
std::exception_ptr _exceptionPtr;
|
||||
InferenceEngine::Task _task;
|
||||
};
|
||||
auto requestExecutor =
|
||||
std::make_shared<RequestExecutor>(std::static_pointer_cast<MultiDeviceInferRequest>(syncInferRequest)->GetSharedRequest());
|
||||
pipeline.emplace_back(requestExecutor, [requestExecutor] {
|
||||
if (nullptr != requestExecutor->_exceptionPtr) {
|
||||
std::rethrow_exception(requestExecutor->_exceptionPtr);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
MultiImmediateExecutor::Ptr _firstExecutor = std::make_shared<MultiImmediateExecutor>();
|
||||
pipeline = {
|
||||
// if the request is coming with device-specific remote blobs make sure it is scheduled to the specific device only:
|
||||
Stage {
|
||||
/*TaskExecutor*/ _firstExecutor, /*task*/ [this, &syncInferRequest]() {
|
||||
// by default, no preferred device:
|
||||
_thisPreferredDeviceName = "";
|
||||
auto execNetwork = _multiSContext->_executableNetwork.lock();
|
||||
// if any input is remote (e.g. was set with SetBlob), let' use the corresponding device
|
||||
for (const auto& it : execNetwork->GetInputsInfo()) {
|
||||
auto b = syncInferRequest->GetBlob(it.first);
|
||||
auto r = b->as<IE::RemoteBlob>();
|
||||
if (r) {
|
||||
const auto name = r->getDeviceName();
|
||||
const auto res = std::find_if(
|
||||
_multiSContext->_devicePrioritiesInitial.cbegin(),
|
||||
_multiSContext->_devicePrioritiesInitial.cend(),
|
||||
[&name](const MultiDevicePlugin::DeviceInformation & d) {
|
||||
return (d.defaultDeviceID.empty() ? d.deviceName : (d.deviceName + "." +
|
||||
d.defaultDeviceID)) == name;
|
||||
});
|
||||
if (_multiSContext->_devicePrioritiesInitial.cend() == res) {
|
||||
IE_THROW() <<
|
||||
"None of the devices (for which current MULTI-device configuration was "
|
||||
"initialized) supports a remote blob created on the device named " << name;
|
||||
} else {
|
||||
// it is ok to take the c_str() here (as pointed in the executable_network.hpp we need to use const char*)
|
||||
// as the original strings are from the "persistent" vector (with the right lifetime)
|
||||
_thisPreferredDeviceName = res->deviceName.c_str();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}},
|
||||
// as the scheduling algo may select any device, this stage accepts the scheduling decision (actual workerRequest)
|
||||
// then sets the device-agnostic blobs to the actual (device-specific) request
|
||||
Stage {
|
||||
/*TaskExecutor*/std::dynamic_pointer_cast<IE::ITaskExecutor>(shared_from_this()), /*task*/ [&syncInferRequest, workerInferRequest]() {
|
||||
*workerInferRequest = _thisWorkerInferRequest;
|
||||
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>(syncInferRequest);
|
||||
multiSyncInferRequest->SetBlobsToAnotherRequest(_thisWorkerInferRequest->_inferRequest);
|
||||
INFO_RUN([workerInferRequest]() {
|
||||
(*workerInferRequest)->_startTimes.push_back(std::chrono::steady_clock::now());
|
||||
});
|
||||
}},
|
||||
// final task in the pipeline:
|
||||
Stage {
|
||||
/*TaskExecutor*/std::make_shared<ThisRequestExecutor>(workerInferRequest, _firstExecutor), /*task*/
|
||||
[this, &syncInferRequest, workerInferRequest]() {
|
||||
INFO_RUN([workerInferRequest]() {
|
||||
(*workerInferRequest)->_endTimes.push_back(std::chrono::steady_clock::now());
|
||||
});
|
||||
std::exception_ptr eptr = (*workerInferRequest)->_exceptionPtr;
|
||||
if (nullptr != eptr) {
|
||||
std::rethrow_exception(eptr);
|
||||
}
|
||||
if (_multiSContext->_needPerfCounters) {
|
||||
auto multiSyncInferRequest = std::dynamic_pointer_cast<MultiDeviceInferRequest>
|
||||
(syncInferRequest);
|
||||
multiSyncInferRequest->_scheduledRequest =
|
||||
(*workerInferRequest)->_inferRequest;
|
||||
}
|
||||
}}
|
||||
};
|
||||
}
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
void MultiSchedule::GenerateWorkers(const std::string& device,
|
||||
const SoExecNetwork& executableNetwork) {
|
||||
auto itNumRequests = std::find_if(_multiSContext->_devicePriorities.cbegin(), _multiSContext->_devicePriorities.cend(),
|
||||
[&device](const DeviceInformation & d) {
|
||||
return d.deviceName == device;
|
||||
});
|
||||
unsigned int optimalNum = 0;
|
||||
try {
|
||||
optimalNum = executableNetwork->GetMetric(METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS)).as<unsigned int>();
|
||||
} catch (const IE::Exception& iie) {
|
||||
IE_THROW()
|
||||
<< "Every device used with the Multi-Device should "
|
||||
<< "support OPTIMAL_NUMBER_OF_INFER_REQUESTS ExecutableNetwork metric. "
|
||||
<< "Failed to query the metric for the " << device << " with error:" <<
|
||||
iie.what();
|
||||
}
|
||||
const auto numRequests = (_multiSContext->_devicePriorities.end() == itNumRequests ||
|
||||
itNumRequests->numRequestsPerDevices == -1) ? optimalNum : itNumRequests->numRequestsPerDevices;
|
||||
auto& workerRequests = _workerRequests[device];
|
||||
auto& idleWorkerRequests = _idleWorkerRequests[device];
|
||||
workerRequests.resize(numRequests);
|
||||
_inferPipelineTasksDeviceSpecific[device] = std::unique_ptr<IE::ThreadSafeQueue<IE::Task>>(new IE::ThreadSafeQueue<IE::Task>);
|
||||
auto* idleWorkerRequestsPtr = &(idleWorkerRequests);
|
||||
idleWorkerRequests.set_capacity(numRequests);
|
||||
int num = 0;
|
||||
for (auto&& workerRequest : workerRequests) {
|
||||
workerRequest._inferRequest = {executableNetwork->CreateInferRequest(), executableNetwork._so};
|
||||
auto* workerRequestPtr = &workerRequest;
|
||||
workerRequestPtr->_index = num++;
|
||||
IE_ASSERT(idleWorkerRequests.try_push(workerRequestPtr) == true);
|
||||
workerRequest._inferRequest->SetCallback(
|
||||
[workerRequestPtr, this, device, idleWorkerRequestsPtr](std::exception_ptr exceptionPtr) mutable {
|
||||
IdleGuard<NotBusyWorkerRequests> idleGuard{workerRequestPtr, *idleWorkerRequestsPtr};
|
||||
workerRequestPtr->_exceptionPtr = exceptionPtr;
|
||||
{
|
||||
auto capturedTask = std::move(workerRequestPtr->_task);
|
||||
capturedTask();
|
||||
}
|
||||
// try to return the request to the idle list (fails if the overall object destruction has began)
|
||||
if (idleGuard.Release()->try_push(workerRequestPtr)) {
|
||||
// let's try to pop a task, as we know there is at least one idle request, schedule if succeeded
|
||||
// if no device-agnostic tasks, let's try pop the device specific task, schedule if succeeded
|
||||
IE::Task t;
|
||||
if (_inferPipelineTasks.try_pop(t)) {
|
||||
ScheduleToWorkerInferRequest(std::move(t));
|
||||
} else if (_inferPipelineTasksDeviceSpecific[device]->try_pop(t)) {
|
||||
ScheduleToWorkerInferRequest(std::move(t), device);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
bool MultiSchedule::ScheduleToWorkerInferRequest(IE::Task inferPipelineTask, DeviceName preferred_device) {
|
||||
std::vector<DeviceInformation> devices;
|
||||
devices = [&] {
|
||||
std::lock_guard<std::mutex> lock(_multiSContext->_mutex);
|
||||
return _multiSContext->_devicePriorities;
|
||||
}();
|
||||
for (auto&& device : devices) {
|
||||
if (!preferred_device.empty() && (device.deviceName != preferred_device)) {
|
||||
continue;
|
||||
}
|
||||
if (RunPipelineTask(inferPipelineTask, _idleWorkerRequests[device.deviceName], preferred_device)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// no vacant requests this time, storing the task to the respective queue
|
||||
if (!preferred_device.empty()) {
|
||||
_inferPipelineTasksDeviceSpecific[preferred_device]->push(std::move(
|
||||
inferPipelineTask));
|
||||
} else {
|
||||
_inferPipelineTasks.push(std::move(inferPipelineTask));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool MultiSchedule::RunPipelineTask(IE::Task& inferPipelineTask,
|
||||
NotBusyWorkerRequests& idleWorkerRequests,
|
||||
const DeviceName& preferred_device) {
|
||||
WorkerInferRequest* workerRequestPtr = nullptr;
|
||||
if (idleWorkerRequests.try_pop(workerRequestPtr)) {
|
||||
IdleGuard<NotBusyWorkerRequests> idleGuard{workerRequestPtr, idleWorkerRequests};
|
||||
_thisWorkerInferRequest = workerRequestPtr;
|
||||
{
|
||||
auto capturedTask = std::move(inferPipelineTask);
|
||||
capturedTask();
|
||||
}
|
||||
idleGuard.Release();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void MultiSchedule::run(IE::Task inferPipelineTask) {
|
||||
ScheduleToWorkerInferRequest(std::move(inferPipelineTask), _thisPreferredDeviceName);
|
||||
}
|
||||
|
||||
MultiSchedule::~MultiSchedule() {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_multiSContext->_mutex);
|
||||
_multiSContext->_devicePriorities.clear();
|
||||
}
|
||||
/* NOTE: The only threads that use `MultiSchedule` worker infer requests' threads.
|
||||
* But AsyncInferRequest destructor should wait for all asynchronous tasks by the request
|
||||
*/
|
||||
for (auto&& idleWorker : _idleWorkerRequests) {
|
||||
// stop accepting any idle requests back (for re-scheduling)
|
||||
idleWorker.second.set_capacity(0);
|
||||
}
|
||||
INFO_RUN([this] {
|
||||
for (auto&& _workerRequest : _workerRequests) {
|
||||
std::list<Time> reqAllStartTimes;
|
||||
std::list<Time> reqAllEndTimes;
|
||||
for (auto& request : _workerRequest.second) {
|
||||
reqAllStartTimes.splice(reqAllStartTimes.end(), request._startTimes);
|
||||
reqAllEndTimes.splice(reqAllEndTimes.end(), request._endTimes);
|
||||
}
|
||||
size_t count = reqAllStartTimes.size();
|
||||
IE_ASSERT(count == reqAllEndTimes.size());
|
||||
reqAllStartTimes.sort(std::less<Time>());
|
||||
reqAllEndTimes.sort(std::less<Time>());
|
||||
if (_workerRequest.first == "CPU_HELP") {
|
||||
if (_cpuHelpFps > 0.0) {
|
||||
LOG_INFO_TAG("CPU_HELP:infer:%ld", _cpuHelpInferCount);
|
||||
LOG_INFO_TAG("CPU_HELP:fps:%lf", _cpuHelpFps);
|
||||
} else if (count >= 1) {
|
||||
std::chrono::duration<double, std::milli> durtation =
|
||||
reqAllEndTimes.back() - reqAllStartTimes.front();
|
||||
LOG_INFO_TAG("CPU_HELP:infer:%ld", count);
|
||||
LOG_INFO_TAG("CPU_HELP:fps:%lf", count * 1000 / durtation.count());
|
||||
}
|
||||
} else {
|
||||
LOG_INFO_TAG("%s:infer:%ld", _workerRequest.first.c_str(), count);
|
||||
auto n = reqAllStartTimes.size();
|
||||
Time time;
|
||||
while (!reqAllStartTimes.empty()) {
|
||||
time = reqAllStartTimes.front();
|
||||
if (time < _cpuHelpReleaseTime) {
|
||||
reqAllStartTimes.pop_front();
|
||||
n--;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (n >= 1) {
|
||||
std::chrono::duration<double, std::milli> durtation =
|
||||
reqAllEndTimes.back() - time;
|
||||
LOG_INFO_TAG("%s:fps:%lf", _workerRequest.first.c_str(),
|
||||
n * 1000 / durtation.count());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
_workerRequests.clear();
|
||||
}
|
||||
|
||||
IInferPtr MultiSchedule::CreateInferRequestImpl(
|
||||
const std::vector<std::shared_ptr<const ov::Node>>& inputs,
|
||||
const std::vector<std::shared_ptr<const ov::Node>>& outputs) {
|
||||
SoInfer request_to_share_blobs_with;
|
||||
IE::RemoteContext::Ptr ctx = nullptr;
|
||||
if (_passthroughExeNet)
|
||||
request_to_share_blobs_with = {_passthroughExeNet->CreateInferRequest(), _passthroughExeNet._so};
|
||||
return std::make_shared<MultiDeviceInferRequest>(inputs, outputs, request_to_share_blobs_with);
|
||||
}
|
||||
|
||||
IInferPtr MultiSchedule::CreateInferRequestImpl(IE::InputsDataMap networkInputs,
|
||||
IE::OutputsDataMap networkOutputs) {
|
||||
SoInfer request_to_share_blobs_with;
|
||||
IE::RemoteContext::Ptr ctx = nullptr;
|
||||
if (_passthroughExeNet)
|
||||
request_to_share_blobs_with = {_passthroughExeNet->CreateInferRequest(), _passthroughExeNet._so};
|
||||
return std::make_shared<MultiDeviceInferRequest>(networkInputs, networkOutputs, request_to_share_blobs_with);
|
||||
}
|
||||
|
||||
IInferPtr MultiSchedule::CreateInferRequest() {
|
||||
auto execNetwork = std::dynamic_pointer_cast<MultiExecutableNetwork>(
|
||||
_multiSContext->_executableNetwork.lock());
|
||||
IInferPtr syncRequestImpl;
|
||||
if (_multiSContext->_core && _multiSContext->_core->isNewAPI())
|
||||
syncRequestImpl = CreateInferRequestImpl(execNetwork->_parameters, execNetwork->_results);
|
||||
if (!syncRequestImpl)
|
||||
syncRequestImpl = CreateInferRequestImpl(execNetwork->_networkInputs, execNetwork->_networkOutputs);
|
||||
syncRequestImpl->setPointerToExecutableNetworkInternal(execNetwork);
|
||||
if (_passthroughExeNet) {
|
||||
auto so = _passthroughExeNet._ptr->GetPointerToSo();
|
||||
// Get the _so from passthrough executable network when batch plugin is disable.
|
||||
if (!so)
|
||||
so = _passthroughExeNet._so;
|
||||
syncRequestImpl->setPointerToSo(so);
|
||||
} else if (std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest()) {
|
||||
auto sharedRequest = std::static_pointer_cast<MultiDeviceInferRequest>(syncRequestImpl)->GetSharedRequest();
|
||||
if (sharedRequest._ptr->getPointerToSo())
|
||||
syncRequestImpl->setPointerToSo(sharedRequest._ptr->getPointerToSo());
|
||||
else
|
||||
syncRequestImpl->setPointerToSo(sharedRequest._so);
|
||||
}
|
||||
return std::make_shared<AsyncInferRequest>(shared_from_this(),
|
||||
syncRequestImpl,
|
||||
execNetwork->_callbackExecutor);
|
||||
}
|
||||
std::string MultiSchedule::GetLogTag() const noexcept {
|
||||
return _LogTag;
|
||||
}
|
||||
} // namespace MultiDevicePlugin
|
||||
|
@ -1,68 +0,0 @@
|
||||
// Copyright (C) 2018-2023 Intel Corporation
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
#pragma once
|
||||
|
||||
#include "schedule.hpp"
|
||||
|
||||
#ifdef MULTIUNITTEST
|
||||
#define MOCKTESTMACRO virtual
|
||||
#define MultiDevicePlugin MockMultiDevicePlugin
|
||||
#else
|
||||
#define MOCKTESTMACRO
|
||||
#endif
|
||||
|
||||
namespace MultiDevicePlugin {
|
||||
struct ThisRequestExecutor : public IE::ITaskExecutor {
|
||||
explicit ThisRequestExecutor(WorkerInferRequest** ptr, MultiImmediateExecutor::Ptr executor = nullptr): _workptrptr{ptr}, _fallbackExec(executor) {}
|
||||
void run(IE::Task task) override {
|
||||
(*_workptrptr)->_task = std::move(task);
|
||||
(*_workptrptr)->_fallbackExec = _fallbackExec;
|
||||
(*_workptrptr)->_inferRequest->StartAsync();
|
||||
};
|
||||
WorkerInferRequest** _workptrptr = nullptr;
|
||||
MultiImmediateExecutor::Ptr _fallbackExec;
|
||||
};
|
||||
|
||||
class MultiSchedule : public Schedule, public IE::ITaskExecutor {
|
||||
public:
|
||||
using Ptr = std::shared_ptr<MultiSchedule>;
|
||||
IInferPtr CreateInferRequest() override;
|
||||
IInferPtr CreateInferRequestImpl(IE::InputsDataMap networkInputs, IE::OutputsDataMap networkOutputs) override;
|
||||
IE::IInferRequestInternal::Ptr CreateInferRequestImpl(const std::vector<std::shared_ptr<const ov::Node>>& inputs,
|
||||
const std::vector<std::shared_ptr<const ov::Node>>& outputs) override;
|
||||
void run(IE::Task inferTask) override;
|
||||
void init(const ScheduleContext::Ptr& sContext) override;
|
||||
Pipeline GetPipeline(const IInferPtr& syncRequestImpl, WorkerInferRequest** WorkerInferRequest) override;
|
||||
virtual ~MultiSchedule();
|
||||
|
||||
public:
|
||||
static thread_local WorkerInferRequest* _thisWorkerInferRequest;
|
||||
// have to use the const char* ptr rather than std::string due to a bug in old gcc versions,
|
||||
// the bug is e.g. manifesting on the old CentOS (and it's 4.8.x gcc) used in our testing
|
||||
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81880
|
||||
static thread_local const char* _thisPreferredDeviceName;
|
||||
|
||||
protected:
|
||||
virtual void GenerateWorkers(const std::string& device, const IE::SoExecutableNetworkInternal& executableNetwork);
|
||||
static bool RunPipelineTask(IE::Task& inferPipelineTask, NotBusyWorkerRequests& idleWorkerRequests, const DeviceName& preferred_device);
|
||||
virtual bool ScheduleToWorkerInferRequest(IE::Task, DeviceName preferred_device = "");
|
||||
std::string GetLogTag() const noexcept;
|
||||
|
||||
protected:
|
||||
IE::ThreadSafeQueue<IE::Task> _inferPipelineTasks;
|
||||
DeviceMap<std::unique_ptr<IE::ThreadSafeQueue<IE::Task>>> _inferPipelineTasksDeviceSpecific;
|
||||
DeviceMap<NotBusyWorkerRequests> _idleWorkerRequests;
|
||||
DeviceMap<std::vector<WorkerInferRequest>> _workerRequests;
|
||||
std::atomic_size_t _numRequestsCreated = {0};
|
||||
MultiScheduleContext::Ptr _multiSContext;
|
||||
SoExecNetwork _passthroughExeNet;
|
||||
Time _cpuHelpReleaseTime;
|
||||
size_t _cpuHelpInferCount = 0;
|
||||
double _cpuHelpFps = 0.0;
|
||||
std::string _LogTag;
|
||||
};
|
||||
|
||||
} // namespace MultiDevicePlugin
|
@ -23,7 +23,7 @@
|
||||
#include <ie_icore.hpp>
|
||||
#include <ie_ngraph_utils.hpp>
|
||||
#include "bind_multi_schedule.hpp"
|
||||
#include "multi_executable_network.hpp"
|
||||
#include "auto_executable_network.hpp"
|
||||
#include "auto_schedule.hpp"
|
||||
#include "auto_executable_network.hpp"
|
||||
|
||||
|
@ -11,9 +11,6 @@ const std::set<std::string> PluginConfig::_deviceBlocklist = {"VPUX", "GNA"};
|
||||
|
||||
PluginConfig::PluginConfig() {
|
||||
set_default();
|
||||
device_property_validator = std::dynamic_pointer_cast<BaseValidator>(std::make_shared<FuncValidator>([](const ov::Any& target) -> bool {
|
||||
return (target.as<std::string>().find(ov::device::properties.name()) != std::string::npos);
|
||||
}));
|
||||
}
|
||||
|
||||
void PluginConfig::set_default() {
|
||||
|
@ -24,16 +24,6 @@ public:
|
||||
virtual bool is_valid(const ov::Any& v) const = 0;
|
||||
};
|
||||
|
||||
class FuncValidator : public BaseValidator {
|
||||
public:
|
||||
explicit FuncValidator(std::function<bool(const ov::Any&)> func) : m_func(func) { }
|
||||
bool is_valid(const ov::Any& v) const override {
|
||||
return m_func(v);
|
||||
}
|
||||
private:
|
||||
std::function<bool(const ov::Any&)> m_func;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
class PropertyTypeValidator : public BaseValidator {
|
||||
public:
|
||||
@ -146,6 +136,7 @@ public:
|
||||
T get_property(const ov::Property<T, mutability>& property) const {
|
||||
return get_property(property.name()).template as<T>();
|
||||
}
|
||||
|
||||
void apply_user_properties();
|
||||
ov::AnyMap get_full_properties();
|
||||
|
||||
@ -232,7 +223,6 @@ private:
|
||||
ov::AnyMap full_properties; // combined with user set properties, including secondary properties
|
||||
ov::AnyMap property_mutabilities; // mutability for supported configs/metrics installation
|
||||
std::map<std::string, BaseValidator::Ptr> property_validators;
|
||||
BaseValidator::Ptr device_property_validator;
|
||||
static const std::set<std::string> _deviceBlocklist;
|
||||
};
|
||||
} // namespace MultiDevicePlugin
|
||||
|
Loading…
Reference in New Issue
Block a user