Bell/release resource (#9222)

* recycle helper resources when hw is ready

Signed-off-by: fishbell <bell.song@intel.com>

* use cv to avoid additional while loop

Signed-off-by: fishbell <bell.song@intel.com>

* refine the logic

Signed-off-by: fishbell <bell.song@intel.com>

* fix potential threading issue

Signed-off-by: fishbell <bell.song@intel.com>

* refine logic

Signed-off-by: fishbell <bell.song@intel.com>

* avoid using global var

Signed-off-by: fishbell <bell.song@intel.com>

* clean up code

Signed-off-by: fishbell <bell.song@intel.com>

* refine

Signed-off-by: fishbell <bell.song@intel.com>

* release helper network/plugin also

Signed-off-by: fishbell <bell.song@intel.com>

* lock when release, avoid double release in destructor

Signed-off-by: fishbell <bell.song@intel.com>

* formatting

Signed-off-by: fishbell <bell.song@intel.com>

* add test case

Signed-off-by: fishbell <bell.song@intel.com>

* add case coverage

Signed-off-by: fishbell <bell.song@intel.com>

* move the task

Signed-off-by: fishbell <bell.song@intel.com>

* remove uncessary lock

Signed-off-by: fishbell <bell.song@intel.com>
This commit is contained in:
song, bell 2022-01-11 11:41:17 -05:00 committed by GitHub
parent 91c89e77d8
commit 3e9ae4bea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 221 additions and 9 deletions

View File

@ -266,11 +266,34 @@ MultiDeviceExecutableNetwork::MultiDeviceExecutableNetwork(const std::string&
_inferPipelineTasksDeviceSpecific["CPU_HELP"] = nullptr;
_executor->run(_loadContext[CPU].task);
_executor->run(_loadContext[ACTUALDEVICE].task);
auto recycleTask = [this]() mutable {
WaitActualNetworkReady();
while (!_exitFlag && _loadContext[ACTUALDEVICE].isAlready) {
// clean up helper infer requests
// first, wait for all the remaining requests to finish
for (auto& iter : _workerRequests["CPU_HELP"]) {
iter._inferRequest._ptr->Wait(InferRequest::WaitMode::RESULT_READY);
}
// late enough to check the idle queue now
// second, check the idle queue if all requests are in place
size_t destroynum = 0;
WorkerInferRequest *workerRequestPtr = nullptr;
while (_idleWorkerRequests["CPU_HELP"].try_pop(workerRequestPtr))
destroynum++;
if (destroynum == _workerRequests["CPU_HELP"].size()) {
std::lock_guard<std::mutex> lock(_confMutex);
_workerRequests["CPU_HELP"].clear();
_loadContext[CPU].executableNetwork._ptr.reset();
_loadContext[CPU].executableNetwork._so.reset();
break;
}
}
};
_executor->run(std::move(recycleTask));
} else {
// only one device need to load network, do not need to load it async
_loadContext[ACTUALDEVICE].task();
}
WaitFirstNetworkReady();
}
void MultiDeviceExecutableNetwork::TryToLoadNetWork(AutoLoadContext& context,
@ -396,12 +419,6 @@ void MultiDeviceExecutableNetwork::WaitActualNetworkReady() const {
if (_loadContext[ACTUALDEVICE].future.valid()) {
_loadContext[ACTUALDEVICE].future.wait();
}
// if _loadContext[ACTUALDEVICE] load failed, fall back to _loadContext[CPU]
if (!_loadContext[ACTUALDEVICE].isAlready) {
_loadContext[ACTUALDEVICE].executableNetwork = _loadContext[CPU].executableNetwork;
_loadContext[ACTUALDEVICE].deviceInfo = _loadContext[CPU].deviceInfo;
_loadContext[ACTUALDEVICE].isAlready = true;
}
});
}
@ -475,6 +492,7 @@ MultiDeviceExecutableNetwork::~MultiDeviceExecutableNetwork() {
if (_workModeIsAUTO) {
// this is necessary to guarantee member destroyed after getting future
if (_loadContext[CPU].isEnabled) {
_exitFlag = true;
_loadContext[CPU].future.wait();
WaitActualNetworkReady();
// it's necessary to wait the loading network threads to stop here.
@ -495,7 +513,10 @@ MultiDeviceExecutableNetwork::~MultiDeviceExecutableNetwork() {
// stop accepting any idle requests back (for re-scheduling)
idleWorker.second.set_capacity(0);
}
_workerRequests.clear();
{
std::lock_guard<std::mutex> lock(_confMutex);
_workerRequests.clear();
}
}
std::shared_ptr<InferenceEngine::RemoteContext> MultiDeviceExecutableNetwork::GetContext() const {

View File

@ -152,7 +152,8 @@ private:
std::promise<void> _firstLoadPromise;
mutable AutoLoadContext _loadContext[CONTEXTNUM];
mutable std::mutex _confMutex;
bool _exitFlag = {false};
const InferenceEngine::CNNNetwork _network;
};
} // namespace MultiDevicePlugin
} // namespace MultiDevicePlugin

View File

@ -0,0 +1,190 @@
// Copyright (C) 2018-2021 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
#include <ie_metric_helpers.hpp>
#include <common_test_utils/test_constants.hpp>
#include "unit_test_utils/mocks/cpp_interfaces/interface/mock_icore.hpp"
#include "unit_test_utils/mocks/mock_iinfer_request.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/impl/mock_inference_plugin_internal.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/interface/mock_iexecutable_network_internal.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/interface/mock_ivariable_state_internal.hpp"
#include "unit_test_utils/mocks/cpp_interfaces/interface/mock_iinference_plugin.hpp"
#include <ie_core.hpp>
#include <multi-device/multi_device_config.hpp>
#include <ngraph_functions/subgraph_builders.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "plugin/mock_auto_device_plugin.hpp"
#include "cpp/ie_plugin.hpp"
#include "mock_common.hpp"
using ::testing::MatcherCast;
using ::testing::AllOf;
using ::testing::Throw;
using ::testing::Matches;
using ::testing::_;
using ::testing::StrEq;
using ::testing::Return;
using ::testing::Property;
using ::testing::Eq;
using ::testing::ReturnRef;
using ::testing::AtLeast;
using ::testing::AnyNumber;
using ::testing::InvokeWithoutArgs;
using ::testing::NiceMock;
using Config = std::map<std::string, std::string>;
using namespace MockMultiDevice;
using ConfigParams = std::tuple<
bool, // cpu load success
bool // hw device load success
>;
class AutoReleaseHelperTest : public ::testing::TestWithParam<ConfigParams> {
public:
std::shared_ptr<ngraph::Function> function;
InferenceEngine::CNNNetwork cnnNet;
std::shared_ptr<NiceMock<MockICore>> core;
std::shared_ptr<NiceMock<MockMultiDeviceInferencePlugin>> plugin;
//mock exeNetwork helper
ov::runtime::SoPtr<IExecutableNetworkInternal> mockExeNetwork;
//mock exeNetwork actual
ov::runtime::SoPtr<IExecutableNetworkInternal> mockExeNetworkActual;
// config for Auto device
std::map<std::string, std::string> config;
std::vector<DeviceInformation> metaDevices;
std::shared_ptr<NiceMock<MockIInferRequestInternal>> inferReqInternal;
std::shared_ptr<NiceMock<MockIInferRequestInternal>> inferReqInternalActual;
size_t optimalNum;
public:
static std::string getTestCaseName(testing::TestParamInfo<ConfigParams> obj) {
bool cpuSuccess;
bool accSuccess;
std::tie(cpuSuccess, accSuccess) = obj.param;
std::ostringstream result;
if (!cpuSuccess) {
result << "cpuLoadFailure_";
} else {
result << "cpuLoadSuccess_";
}
if (!accSuccess) {
result << "accelerateorLoadFailure";
} else {
result << "accelerateorLoadSuccess";
}
return result.str();
}
void TearDown() override {
core.reset();
plugin.reset();
//mockIExeNet.reset();
mockExeNetwork = {};
mockExeNetworkActual = {};
config.clear();
metaDevices.clear();
inferReqInternal.reset();
inferReqInternalActual.reset();
}
void SetUp() override {
// prepare mockExeNetwork
auto mockIExeNet = std::make_shared<NiceMock<MockIExecutableNetworkInternal>>();
mockExeNetwork = {mockIExeNet, {}};
auto mockIExeNetActual = std::make_shared<NiceMock<MockIExecutableNetworkInternal>>();
mockExeNetworkActual = {mockIExeNetActual, {}};
// prepare mockicore and cnnNetwork for loading
core = std::make_shared<NiceMock<MockICore>>();
NiceMock<MockMultiDeviceInferencePlugin>* mock_multi = new NiceMock<MockMultiDeviceInferencePlugin>();
plugin.reset(mock_multi);
function = ngraph::builder::subgraph::makeConvPoolRelu();
cnnNet = InferenceEngine::CNNNetwork(function);
// replace core with mock Icore
plugin->SetCore(core);
// mock execNetwork can work
inferReqInternal = std::make_shared<NiceMock<MockIInferRequestInternal>>();
ON_CALL(*mockIExeNet.get(), CreateInferRequest()).WillByDefault(Return(inferReqInternal));
IE_SET_METRIC(OPTIMAL_NUMBER_OF_INFER_REQUESTS, optimalNum, 1);
ON_CALL(*mockIExeNet.get(), GetMetric(StrEq(METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS))))
.WillByDefault(Return(optimalNum));
inferReqInternalActual = std::make_shared<NiceMock<MockIInferRequestInternal>>();
ON_CALL(*mockIExeNetActual.get(), CreateInferRequest()).WillByDefault(Return(inferReqInternalActual));
ON_CALL(*mockIExeNetActual.get(), GetMetric(StrEq(METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS))))
.WillByDefault(Return(optimalNum));
IE_SET_METRIC(SUPPORTED_CONFIG_KEYS, supportConfigs, {});
ON_CALL(*core, GetMetric(_, StrEq(METRIC_KEY(SUPPORTED_CONFIG_KEYS)), _))
.WillByDefault(Return(supportConfigs));
}
};
TEST_P(AutoReleaseHelperTest, releaseResource) {
// get Parameter
bool cpuSuccess;
bool accSuccess;
std::tie(cpuSuccess, accSuccess) = this->GetParam();
size_t decreaseCount = 0;
// test auto plugin
config.insert({CONFIG_KEY_INTERNAL(MULTI_WORK_MODE_AS_AUTO), InferenceEngine::PluginConfigParams::YES});
const std::string strDevices = CommonTestUtils::DEVICE_GPU + std::string(",") +
CommonTestUtils::DEVICE_CPU;
if (accSuccess) {
ON_CALL(*core, LoadNetwork(::testing::Matcher<const InferenceEngine::CNNNetwork&>(_),
::testing::Matcher<const std::string&>(StrEq(CommonTestUtils::DEVICE_GPU)),
::testing::Matcher<const Config&>(_))).WillByDefault(InvokeWithoutArgs([this]() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
return mockExeNetworkActual; }));
} else {
ON_CALL(*core, LoadNetwork(::testing::Matcher<const InferenceEngine::CNNNetwork&>(_),
::testing::Matcher<const std::string&>(StrEq(CommonTestUtils::DEVICE_GPU)),
::testing::Matcher<const Config&>(_))).WillByDefault(InvokeWithoutArgs([this]() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
IE_THROW() << "";
return mockExeNetworkActual; }));
}
if (cpuSuccess) {
ON_CALL(*core, LoadNetwork(::testing::Matcher<const InferenceEngine::CNNNetwork&>(_),
::testing::Matcher<const std::string&>(StrEq(CommonTestUtils::DEVICE_CPU)),
::testing::Matcher<const Config&>(_))).WillByDefault(Return(mockExeNetwork));
if (accSuccess)
decreaseCount++;
} else {
ON_CALL(*core, LoadNetwork(::testing::Matcher<const InferenceEngine::CNNNetwork&>(_),
::testing::Matcher<const std::string&>(StrEq(CommonTestUtils::DEVICE_CPU)),
::testing::Matcher<const Config&>(_))).WillByDefault(Throw(InferenceEngine::GeneralError{""}));
}
metaDevices = {{CommonTestUtils::DEVICE_CPU, {}, -1}, {CommonTestUtils::DEVICE_GPU, {}, -1}};
DeviceInformation devInfo;
ON_CALL(*plugin, ParseMetaDevices(_, _)).WillByDefault(Return(metaDevices));
ON_CALL(*plugin, SelectDevice(Property(&std::vector<DeviceInformation>::size, Eq(2)), _, _))
.WillByDefault(Return(metaDevices[1]));
ON_CALL(*plugin, SelectDevice(Property(&std::vector<DeviceInformation>::size, Eq(1)), _, _))
.WillByDefault(Return(metaDevices[0]));
config.insert({InferenceEngine::MultiDeviceConfigParams::KEY_MULTI_DEVICE_PRIORITIES,
CommonTestUtils::DEVICE_CPU + std::string(",") + CommonTestUtils::DEVICE_GPU});
std::shared_ptr<InferenceEngine::IExecutableNetworkInternal> exeNetwork;
if (cpuSuccess || accSuccess)
ASSERT_NO_THROW(exeNetwork = plugin->LoadExeNetworkImpl(cnnNet, config));
else
ASSERT_THROW(exeNetwork = plugin->LoadExeNetworkImpl(cnnNet, config), InferenceEngine::Exception);
auto sharedcount = mockExeNetwork._ptr.use_count();
auto requestsharedcount = inferReqInternal.use_count();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
EXPECT_EQ(mockExeNetwork._ptr.use_count(), sharedcount - decreaseCount);
EXPECT_EQ(inferReqInternal.use_count(), requestsharedcount - decreaseCount);
}
//
const std::vector<ConfigParams> testConfigs = {ConfigParams {true, true},
ConfigParams {true, false},
ConfigParams {false, true},
ConfigParams {false, false}
};
INSTANTIATE_TEST_SUITE_P(smoke_Auto_BehaviorTests, AutoReleaseHelperTest,
::testing::ValuesIn(testConfigs),
AutoReleaseHelperTest::getTestCaseName);