[GPU] Separate async pipeline into explicit stages (#7197)

This commit is contained in:
Mikhail Letavin 2021-10-05 16:04:42 +03:00 committed by GitHub
parent a56d81345d
commit 58e8893a26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 232 additions and 137 deletions

View File

@ -3,16 +3,27 @@
//
#include "cldnn_async_infer_request.h"
#include "cldnn_itt.h"
#include <memory>
CLDNNPlugin::CLDNNAsyncInferRequest::CLDNNAsyncInferRequest(const InferenceEngine::IInferRequestInternal::Ptr &inferRequest,
const InferenceEngine::ITaskExecutor::Ptr &taskExecutor,
const InferenceEngine::ITaskExecutor::Ptr &callbackExecutor)
: InferenceEngine::AsyncInferRequestThreadSafeDefault(inferRequest, taskExecutor, callbackExecutor)
{ }
CLDNNPlugin::CLDNNAsyncInferRequest::CLDNNAsyncInferRequest(const CLDNNInferRequest::Ptr &inferRequest,
const InferenceEngine::ITaskExecutor::Ptr& taskExecutor,
const InferenceEngine::ITaskExecutor::Ptr& waitExecutor,
const InferenceEngine::ITaskExecutor::Ptr& callbackExecutor)
: AsyncInferRequestThreadSafeDefault(inferRequest, taskExecutor, callbackExecutor), _inferRequest(inferRequest), _waitExecutor(waitExecutor) {
_pipeline = {};
void CLDNNPlugin::CLDNNAsyncInferRequest::Infer_ThreadUnsafe() {
InferUsingAsync();
_pipeline.push_back({taskExecutor,
[this] {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNAsyncInferRequest::PreprocessingAndStartPipeline");
_inferRequest->preprocess();
_inferRequest->enqueue();
} });
_pipeline.push_back({_waitExecutor,
[this] {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNAsyncInferRequest::WaitPipeline");
_inferRequest->wait();
}});
}
CLDNNPlugin::CLDNNAsyncInferRequest::~CLDNNAsyncInferRequest() {

View File

@ -13,13 +13,17 @@ namespace CLDNNPlugin {
class CLDNNAsyncInferRequest : public InferenceEngine::AsyncInferRequestThreadSafeDefault {
public:
CLDNNAsyncInferRequest(const InferenceEngine::IInferRequestInternal::Ptr &inferRequest,
const InferenceEngine::ITaskExecutor::Ptr &taskExecutor,
const InferenceEngine::ITaskExecutor::Ptr &callbackExecutor);
void Infer_ThreadUnsafe() override;
using Parent = InferenceEngine::AsyncInferRequestThreadSafeDefault;
CLDNNAsyncInferRequest(const CLDNNInferRequest::Ptr &inferRequest,
const InferenceEngine::ITaskExecutor::Ptr& taskExecutor,
const InferenceEngine::ITaskExecutor::Ptr& waitExecutor,
const InferenceEngine::ITaskExecutor::Ptr& callbackExecutor);
~CLDNNAsyncInferRequest();
private:
CLDNNInferRequest::Ptr _inferRequest;
InferenceEngine::ITaskExecutor::Ptr _waitExecutor;
};
} // namespace CLDNNPlugin

View File

@ -46,7 +46,8 @@ CLDNNExecNetwork::CLDNNExecNetwork(InferenceEngine::CNNNetwork &network, std::sh
}
}()},
m_config(config),
m_taskExecutor{_taskExecutor} {
m_taskExecutor{ _taskExecutor },
m_waitExecutor(InferenceEngine::ExecutorManager::getInstance()->getIdleCPUStreamsExecutor({ "GPUWaitExecutor" })) {
auto casted_context = std::dynamic_pointer_cast<gpu::ClContext>(context);
if (nullptr == casted_context) {
@ -93,7 +94,12 @@ IInferRequestInternal::Ptr CLDNNExecNetwork::CreateInferRequestImpl(InputsDataMa
IInferRequestInternal::Ptr CLDNNExecNetwork::CreateInferRequest() {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNExecNetwork::CreateInferRequest");
return CreateAsyncInferRequestFromSync<CLDNNAsyncInferRequest>();
auto internalRequest = CreateInferRequestImpl(_networkInputs, _networkOutputs);
internalRequest->setPointerToExecutableNetworkInternal(shared_from_this());
return std::make_shared<CLDNNAsyncInferRequest>(std::static_pointer_cast<CLDNNInferRequest>(internalRequest),
m_taskExecutor,
m_waitExecutor,
_callbackExecutor);
}
std::shared_ptr<ngraph::Function> CLDNNExecNetwork::GetExecGraphInfo() {

View File

@ -38,6 +38,7 @@ public:
InferenceEngine::gpu::ClContext::Ptr m_context;
Config m_config;
InferenceEngine::ITaskExecutor::Ptr m_taskExecutor;
InferenceEngine::ITaskExecutor::Ptr m_waitExecutor;
};
}; // namespace CLDNNPlugin

View File

@ -44,7 +44,8 @@ CLDNNGraph::CLDNNGraph(InferenceEngine::CNNNetwork& network, gpu::ClContext::Ptr
: m_context(context)
, m_networkName(network.getName())
, m_config(config)
, m_stream_id(stream_id) {
, m_stream_id(stream_id)
, m_state(0) {
m_program = std::make_shared<Program>(network, GetEngine(), m_config);
Build();
}
@ -54,7 +55,8 @@ CLDNNGraph::CLDNNGraph(std::shared_ptr<CLDNNGraph> graph, uint16_t stream_id)
, m_program(graph->m_program)
, m_networkName(graph->m_networkName)
, m_config(graph->m_config)
, m_stream_id(stream_id) {
, m_stream_id(stream_id)
, m_state(0) {
Build();
}

View File

@ -30,6 +30,11 @@ namespace CLDNNPlugin {
class CLDNNGraph {
public:
enum class Stage : uint32_t {
PREPROC = 1,
EXECUTE = 2,
POSTPROC = 4
};
typedef std::shared_ptr<CLDNNGraph> Ptr;
CLDNNGraph(InferenceEngine::CNNNetwork& network, InferenceEngine::gpu::ClContext::Ptr context, Config config, uint16_t stream_id = 0);
@ -51,10 +56,26 @@ public:
InferenceEngine::SizeVector GetOutputSize(std::string outName) const;
std::string MapOutputName(std::string outName) const;
std::string getName() const { return m_networkName; }
std::mutex& get_mutex() { return m_infer_mutex; }
void wait(Stage stage_mask) {
std::unique_lock<std::mutex> lock(m_infer_mutex);
m_cv.wait(lock, [&] {
return (m_state & (uint32_t)stage_mask) == 0;
});
m_state |= (uint32_t)stage_mask;
}
void notify(Stage stage_mask) {
{
std::lock_guard<std::mutex> lock(m_infer_mutex);
m_state &= ~(uint32_t)stage_mask;
}
m_cv.notify_one();
}
protected:
uint32_t m_state;
std::condition_variable m_cv;
std::mutex m_infer_mutex;
std::string m_networkName;
Config m_config;

View File

@ -455,6 +455,160 @@ CLDNNInferRequest::CLDNNInferRequest(InputsDataMap networkInputs, OutputsDataMap
streamExecutor = dynamic_cast<InferenceEngine::IStreamsExecutor*>(execNetwork->m_taskExecutor.get());
}
// ----------------------------------------------------------------------------------------- //
// ---------------------------- internal pipeline stages ----------------------------------- //
// ----------------------------------------------------------------------------------------- //
void CLDNNInferRequest::preprocess() {
int streamID = 0;
auto& streamGraphs = static_cast<CLDNNExecNetwork*>(_exeNetwork.get())->m_graphs;
if (nullptr != streamExecutor) {
streamID = streamExecutor->GetStreamId();
int numGraphs = streamGraphs.size();
streamID = streamID % numGraphs;
}
m_graph = streamGraphs[streamID];
m_graph->wait(CLDNNGraph::Stage::PREPROC);
if (m_graph->GetMaxDynamicBatchSize() > 1) {
preprocess_dynamic();
return;
}
execDataPreprocessing(_inputs, true); // "true" stands for serial preprocessing in case of OpenMP
m_graph->notify(CLDNNGraph::Stage::PREPROC);
}
void CLDNNInferRequest::enqueue() {
m_graph->wait(CLDNNGraph::Stage::EXECUTE);
if (m_graph->GetMaxDynamicBatchSize() > 1) {
enqueue_dynamic();
return;
}
// set input and output memory from request blob maps
// into the network object primitives
std::vector<cldnn::event::ptr> dependencies;
for (auto& item : _inputs) {
std::string inputName = item.first;
Blob::Ptr& inputBlob = item.second;
auto nv12_ptr = inputBlob->as<NV12Blob>();
auto batched_ptr = inputBlob->as<BatchedBlob>();
bool is_batched = batched_ptr != nullptr;
bool is_nv12 = nv12_ptr != nullptr;
if (is_nv12 || is_batched) {
int num_blobs = is_batched ? batched_ptr->size() : 1;
int expected_batch = is_batched
? _networkInputs.at(inputName)->getTensorDesc().getDims()[0]
: 1;
for (auto i = 0; i < expected_batch; i++) {
std::string y_name = inputName + "_Y" + std::to_string(i);
std::string uv_name = inputName + "_UV" + std::to_string(i);
if (is_batched) {
int idx = i < num_blobs ? i : num_blobs - 1;
nv12_ptr = getNV12BlobOrException(batched_ptr, idx);
}
prepare_input(y_name, nv12_ptr->y(), dependencies);
prepare_input(uv_name, nv12_ptr->uv(), dependencies);
}
} else {
// regular blob
prepare_input(inputName, inputBlob, dependencies);
}
}
for (auto& item : _outputs) {
std::string outputName = item.first;
Blob::Ptr& outputBlob = item.second;
prepare_output(outputName, outputBlob);
}
internal_outputs.clear();
internal_outputs = m_graph->GetNetwork()->execute(dependencies);
}
void CLDNNInferRequest::wait() {
if (m_graph->GetMaxDynamicBatchSize() > 1) {
wait_dynamic();
return;
}
if (internal_outputs.empty()) {
IE_THROW() << "Inference was not started!\n";
}
// wait for completion & collect outputs as requested by the model
for (auto& no : _networkOutputs) {
Blob::Ptr bptr = _outputs[no.first];
std::string outputID = outputsMap.at(no.first);
auto outputMemory = internal_outputs.at(outputID).get_memory();
// mapping remote blobs not needed -
// let the user take care of them explicitly
if (!bptr->is<gpu::ClBlob>()) {
copy_output_data(outputMemory, bptr);
}
}
// finally collect profiling info
if (m_useProfiling) {
m_graph->UpdatePerfStatistics();
}
m_graph->notify(CLDNNGraph::Stage::EXECUTE);
}
void CLDNNInferRequest::preprocess_dynamic() {
// execute input pre-processing.
execDataPreprocessing(_inputs, true); // "true" stands for serial preprocessing in case of OpenMP
m_graph->notify(CLDNNGraph::Stage::PREPROC);
}
void CLDNNInferRequest::enqueue_dynamic() {
internal_outputs_dynamic.clear();
auto numNets = m_graph->GetNetworksCount();
internal_outputs_dynamic.resize(numNets);
// set up exection and put all graphs into driver queue
for (unsigned nb = 0; nb < numNets; nb++) {
unsigned int mask = 1 << nb;
if (m_curBatch & mask) {
for (auto& item : _inputs) {
const cldnn::primitive_id& inputName = item.first;
const Blob::Ptr inputBlob = item.second;
auto inputLayout = m_graph->GetInputLayouts().at(inputName);
inputLayout.size.batch[0] = mask;
copy_input_data(m_graph->GetNetwork(nb), inputName, inputLayout, *inputBlob, &batchInputs[inputName][nb]);
}
internal_outputs_dynamic[nb] = m_graph->GetNetwork(nb)->execute();
}
}
}
void CLDNNInferRequest::wait_dynamic() {
if (internal_outputs_dynamic.empty()) {
IE_THROW() << "Inference was not started!\n";
}
// now try to get execution results
for (unsigned nb = 0; nb < m_graph->GetNetworksCount(); nb++) {
unsigned int mask = 1 << nb;
if (m_curBatch & mask) {
for (auto& no : _networkOutputs) {
std::string outputID = outputsMap.at(no.first);
auto outputMemory = internal_outputs_dynamic[nb].at(outputID).get_memory();
Blob::Ptr bptr = _outputs[no.first];
copy_output_data(outputMemory, bptr, &batchOutputs[no.first][nb]);
}
}
}
m_graph->notify(CLDNNGraph::Stage::EXECUTE);
}
// ----------------------------------------------------------------------------------------- //
// ---------------------------- internal utils --------- ----------------------------------- //
// ----------------------------------------------------------------------------------------- //
@ -693,6 +847,7 @@ void CLDNNInferRequest::allocate_outputs_dynamic() {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNInferRequest::allocate_outputs_dynamic");
// allocate outputs
for (auto& no : _networkOutputs) {
std::string outputID = m_graph->MapOutputName(no.first);
DataPtr oi = no.second;
TensorDesc desc = oi->getTensorDesc();
SizeVector& dims = desc.getDims();
@ -706,130 +861,16 @@ void CLDNNInferRequest::allocate_outputs_dynamic() {
Blob::Ptr outputBlob = create_output_host_blob(desc);
outputBlob->allocate();
_outputs[no.first] = outputBlob;
}
}
void CLDNNInferRequest::exec_and_parse(const std::vector<cldnn::event::ptr>& dependencies) {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNInferRequest::execAndParse");
auto networkOutputs = m_graph->GetNetwork()->execute(dependencies);
// Collect outputs as requested by the model
for (auto& no : _networkOutputs) {
Blob::Ptr bptr = _outputs[no.first];
std::string outputID = outputsMap.at(no.first);
auto outputMemory = networkOutputs.at(outputID).get_memory();
// mapping remote blobs not needed -
// let the user take care of them explicitly
if (!bptr->is<gpu::ClBlob>()) {
copy_output_data(outputMemory, bptr);
}
}
}
void CLDNNInferRequest::exec_and_parse_dynamic() {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNInferRequest::exec_and_parse_dynamic");
std::vector<std::map<cldnn::primitive_id, cldnn::network_output>> networkOutputs(m_graph->GetNetworksCount());
// set up exection and put all graphs into driver queue
for (unsigned nb = 0; nb < m_graph->GetNetworksCount(); nb++) {
unsigned int mask = 1 << nb;
if (m_curBatch & mask) {
for (auto& item : _inputs) {
const cldnn::primitive_id& inputName = item.first;
const Blob::Ptr inputBlob = item.second;
auto inputLayout = m_graph->GetInputLayouts().at(inputName);
inputLayout.size.batch[0] = mask;
copy_input_data(m_graph->GetNetwork(nb), inputName, inputLayout, *inputBlob, &batchInputs[inputName][nb]);
}
networkOutputs[nb] = m_graph->GetNetwork(nb)->execute();
}
}
// now try to get execution results
for (unsigned nb = 0; nb < m_graph->GetNetworksCount(); nb++) {
unsigned int mask = 1 << nb;
if (m_curBatch & mask) {
for (auto& no : _networkOutputs) {
std::string outputID = m_graph->MapOutputName(no.first);
auto outputMemory = networkOutputs[nb].at(outputID).get_memory();
Blob::Ptr bptr = _outputs[no.first];
copy_output_data(outputMemory, bptr, &batchOutputs[no.first][nb]);
}
}
outputsMap[no.first] = outputID;
}
}
void CLDNNInferRequest::InferImpl() {
OV_ITT_SCOPED_TASK(itt::domains::CLDNNPlugin, "CLDNNInferRequest::InferImpl");
int streamID = 0;
if (nullptr != streamExecutor) {
streamID = streamExecutor->GetStreamId();
}
m_graph = static_cast<CLDNNExecNetwork*>(_exeNetwork.get())->m_graphs[streamID];
// execute input pre-processing.
execDataPreprocessing(_inputs, true); // "true" stands for serial preprocessing in case of OpenMP
if (m_graph->GetMaxDynamicBatchSize() > 1) {
exec_and_parse_dynamic();
return;
}
{
// try locking stream infer mutex
const std::lock_guard<std::mutex> lock(m_graph->get_mutex());
// set input and output memory from request blob maps
// into the network object primitives
std::vector<cldnn::event::ptr> dependencies;
for (auto& item : _inputs) {
std::string inputName = item.first;
Blob::Ptr& inputBlob = item.second;
auto nv12_ptr = inputBlob->as<NV12Blob>();
auto batched_ptr = inputBlob->as<BatchedBlob>();
bool is_batched = batched_ptr != nullptr;
bool is_nv12 = nv12_ptr != nullptr;
if (is_nv12 || is_batched) {
int num_blobs = is_batched ? batched_ptr->size() : 1;
int expected_batch = is_batched
? _networkInputs.at(inputName)->getTensorDesc().getDims()[0]
: 1;
for (auto i = 0; i < expected_batch; i++) {
std::string y_name = inputName + "_Y" + std::to_string(i);
std::string uv_name = inputName + "_UV" + std::to_string(i);
if (is_batched) {
int idx = i < num_blobs ? i : num_blobs - 1;
nv12_ptr = getNV12BlobOrException(batched_ptr, idx);
}
prepare_input(y_name, nv12_ptr->y(), dependencies);
prepare_input(uv_name, nv12_ptr->uv(), dependencies);
}
} else {
// regular blob
prepare_input(inputName, inputBlob, dependencies);
}
}
for (auto& item : _outputs) {
std::string outputName = item.first;
Blob::Ptr& outputBlob = item.second;
prepare_output(outputName, outputBlob);
}
// The actual inference
exec_and_parse(dependencies);
// finally collect profiling info
if (m_useProfiling) {
m_graph->UpdatePerfStatistics();
}
}
preprocess();
enqueue();
wait();
}
std::map<std::string, InferenceEngineProfileInfo> CLDNNInferRequest::GetPerformanceCounts() const {

View File

@ -46,6 +46,14 @@ public:
void EnableProfiling() { m_useProfiling = true; }
void EnableStreams() { m_useStreams = true; }
void preprocess();
void enqueue();
void wait();
void preprocess_dynamic();
void enqueue_dynamic();
void wait_dynamic();
private:
InferenceEngine::BlobMap _deviceOutputs;
std::map<std::string, cldnn::primitive_id> inputsMap;
@ -77,8 +85,9 @@ private:
void allocate_outputs();
void allocate_inputs_dynamic();
void allocate_outputs_dynamic();
void exec_and_parse(const std::vector<cldnn::event::ptr>& dependencies);
void exec_and_parse_dynamic();
std::map<cldnn::primitive_id, cldnn::network_output> internal_outputs;
std::vector<std::map<cldnn::primitive_id, cldnn::network_output>> internal_outputs_dynamic;
};
}; // namespace CLDNNPlugin