[PYTHON API] update InferQueue (#8513)
* Bind exec core ov (#50)
* Output const node python tests (#52)
* add python bindings tests for Output<const ov::None>
* add proper tests
* add new line
* rename ie_version to version
* Pszmel/bind infer request (#51)
* remove set_batch, get_blob and set_blob
* update InferRequest class
* change InferenceEngine::InferRequest to ov::runtime::InferRequest
* update set_callback body
* update bindings to reflect ov::runtime::InferRequest
* bind set_input_tensor and get_input_tensor
* style fix
* clen ie_infer_queue.cpp
* Bind exec core ov (#50)
* bind core, exec_net classes
* rm unused function
* add new line
* rename ie_infer_request -> infer_request
* update imports
* update __init__.py
* update ie_api.py
* Replace old containers with the new one
* create impl for create_infer_request
* comment out infer_queue to avoid errors with old infer_request
* update infer_request bind to reflect new infer_request api
* comment out inpuit_info from ie_network to avoid errors with old containers
* Register new containers and comment out InferQueue
* update infer request tests
* style fix
* remove unused imports
* remove unused imports and 2 methods
* add tests to cover all new methods from infer_request
* style fix
* add test
* remove registration of InferResults
* update name of exception_ptr parameter
* update the loops that iterate through inputs and outputs
* clean setCustomCallbacks
* style fix
* add Tensor import
* style fix
* update infer and normalize_inputs
* style fix
* rename startTime and endTime
* Create test for mixed keys as infer arguments
* update infer function
* update return type of infer
Co-authored-by: Bartek Szmelczynski <bartosz.szmelczynski@intel.com>
* fix get_version
* fix opaque issue
* some cosmetic changes
* fix codestyle in tests
* make tests green
* Extend python InferRequest
* Extend python Function
* Change return value of infer call
* Fix missing precisions conversions in CPU plugin
* Rework of runtime for new tests
* Fixed onnx reading in python tests
* Edit compatibility tests
* Edit tests
* Add FLOAT_LIKE xfails
* [Python API] bind ProfilingInfo (#55)
* bind ProfilingInfo
* Add tests
* Fix code style
* Add property
* fix codestyle
* Infer new request method (#56)
* fix conflicts, add infer_new_request function
* remove redundant functions, fix style
* revert the unwanted changes
* revert removal of the Blob
* revert removal of isTblob
* add add_extension from path
* codestyle
* fix win build
* add inputs-outputs to function
* update infer queue
* fix code style
* Hot-fix CPU plugin with precision
* fix start_async
* add performance hint to time infer (#8480)
* Updated common migration pipeline (#8176)
* Updated common migration pipeline
* Fixed merge issue
* Added new model and extended example
* Fixed typo
* Added v10-v11 comparison
* Avoid redundant graph nodes scans (#8415)
* Refactor work with env variables (#8208)
* del MO_ROOT
* del MO_ROOT from common_utils.py
* add MO_PATH to common_utils.py
* change mo_path
* [IE Sample Scripts] Use cmake to build samples (#8442)
* Use cmake to build samples
* Add the option to set custom build output folder
* Remove opset8 from compatibility ngraph python API (#8452)
* [GPU] OneDNN gpu submodule update to version 2.5 (#8449)
* [GPU] OneDNN gpu submodule update to version 2.5
* [GPU] Updated onednn submodule and added layout optimizer fix
* Install rules for static libraries case (#8384)
* Proper cmake install for static libraries case
* Added an ability to skip template plugin
* Added install rules for VPU / GPU
* Install more libraries
* Fixed absolute TBB include paths
* Disable GNA
* Fixed issue with linker
* Some fixes
* Fixed linkage issues in tests
* Disabled some tests
* Updated CI pipelines
* Fixed Windows linkage
* Fixed custom_opset test for static casr
* Fixed CVS-70313
* Continue on error
* Fixed clanf-format
* Try to fix Windows linker
* Fixed compilation
* Disable samples
* Fixed samples build with THREADING=SEQ
* Fixed link error on Windows
* Fixed ieFuncTests
* Added static Azure CI
* Revert "Fixed link error on Windows"
This reverts commit 78cca36fd2
.
* Merge static and dynamic linux pipelines
* Fixed Azure
* fix codestyle
* rename all methods in this class to snake_case
* some updates
* code style
* fix code style in tests
* compute latency in callback
* Fix get_idle_request
* fix latency
* Fix code style
Co-authored-by: Bartek Szmelczynski <bartosz.szmelczynski@intel.com>
Co-authored-by: Anastasia Kuporosova <anastasia.kuporosova@intel.com>
Co-authored-by: Piotr Szmelczynski <piotr.szmelczynski@intel.com>
Co-authored-by: jiwaszki <jan.iwaszkiewicz@intel.com>
Co-authored-by: Victor Kuznetsov <victor.kuznetsov@intel.com>
Co-authored-by: Ilya Churaev <ilya.churaev@intel.com>
Co-authored-by: Tomasz Jankowski <tomasz1.jankowski@intel.com>
Co-authored-by: Dmitry Pigasin <dmitry.pigasin@intel.com>
Co-authored-by: Artur Kulikowski <artur.kulikowski@intel.com>
Co-authored-by: Ilya Znamenskiy <ilya.znamenskiy@intel.com>
Co-authored-by: Ilya Lavrenov <ilya.lavrenov@intel.com>
This commit is contained in:
parent
e6884c3fd7
commit
4a1cfdc9ff
@ -36,7 +36,7 @@ from openvino.pyopenvino import InputInfoCPtr
|
|||||||
from openvino.pyopenvino import DataPtr
|
from openvino.pyopenvino import DataPtr
|
||||||
from openvino.pyopenvino import TensorDesc
|
from openvino.pyopenvino import TensorDesc
|
||||||
from openvino.pyopenvino import get_version
|
from openvino.pyopenvino import get_version
|
||||||
#from openvino.pyopenvino import InferQueue
|
from openvino.pyopenvino import AsyncInferQueue
|
||||||
from openvino.pyopenvino import InferRequest # TODO: move to ie_api?
|
from openvino.pyopenvino import InferRequest # TODO: move to ie_api?
|
||||||
from openvino.pyopenvino import Blob
|
from openvino.pyopenvino import Blob
|
||||||
from openvino.pyopenvino import PreProcessInfo
|
from openvino.pyopenvino import PreProcessInfo
|
||||||
@ -83,5 +83,5 @@ ExecutableNetwork.infer_new_request = infer_new_request
|
|||||||
# Patching InferRequest
|
# Patching InferRequest
|
||||||
InferRequest.infer = infer
|
InferRequest.infer = infer
|
||||||
InferRequest.start_async = start_async
|
InferRequest.start_async = start_async
|
||||||
# Patching InferQueue
|
# Patching AsyncInferQueue
|
||||||
#InferQueue.async_infer = async_infer
|
AsyncInferQueue.start_async = start_async
|
||||||
|
@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import copy
|
import copy
|
||||||
from typing import List
|
from typing import List, Union
|
||||||
|
|
||||||
from openvino.pyopenvino import TBlobFloat32
|
from openvino.pyopenvino import TBlobFloat32
|
||||||
from openvino.pyopenvino import TBlobFloat64
|
from openvino.pyopenvino import TBlobFloat64
|
||||||
@ -17,6 +17,7 @@ from openvino.pyopenvino import TBlobInt8
|
|||||||
from openvino.pyopenvino import TBlobUint8
|
from openvino.pyopenvino import TBlobUint8
|
||||||
from openvino.pyopenvino import TensorDesc
|
from openvino.pyopenvino import TensorDesc
|
||||||
from openvino.pyopenvino import InferRequest
|
from openvino.pyopenvino import InferRequest
|
||||||
|
from openvino.pyopenvino import AsyncInferQueue
|
||||||
from openvino.pyopenvino import ExecutableNetwork
|
from openvino.pyopenvino import ExecutableNetwork
|
||||||
from openvino.pyopenvino import Tensor
|
from openvino.pyopenvino import Tensor
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ def infer_new_request(exec_net: ExecutableNetwork, inputs: dict = None) -> List[
|
|||||||
return [copy.deepcopy(tensor.data) for tensor in res]
|
return [copy.deepcopy(tensor.data) for tensor in res]
|
||||||
|
|
||||||
# flake8: noqa: D102
|
# flake8: noqa: D102
|
||||||
def start_async(request: InferRequest, inputs: dict = {}, userdata: dict = None) -> None: # type: ignore
|
def start_async(request: Union[InferRequest, AsyncInferQueue], inputs: dict = {}, userdata: dict = None) -> None: # type: ignore
|
||||||
request._start_async(inputs=normalize_inputs(inputs), userdata=userdata)
|
request._start_async(inputs=normalize_inputs(inputs), userdata=userdata)
|
||||||
|
|
||||||
# flake8: noqa: C901
|
# flake8: noqa: C901
|
||||||
|
@ -0,0 +1,205 @@
|
|||||||
|
// Copyright (C) 2021 Intel Corporation
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
#include "pyopenvino/core/async_infer_queue.hpp"
|
||||||
|
|
||||||
|
#include <ie_common.h>
|
||||||
|
#include <pybind11/functional.h>
|
||||||
|
#include <pybind11/stl.h>
|
||||||
|
|
||||||
|
#include <chrono>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "pyopenvino/core/common.hpp"
|
||||||
|
#include "pyopenvino/core/infer_request.hpp"
|
||||||
|
|
||||||
|
namespace py = pybind11;
|
||||||
|
|
||||||
|
class AsyncInferQueue {
|
||||||
|
public:
|
||||||
|
AsyncInferQueue(std::vector<InferRequestWrapper> requests,
|
||||||
|
std::queue<size_t> idle_handles,
|
||||||
|
std::vector<py::object> user_ids)
|
||||||
|
: _requests(requests),
|
||||||
|
_idle_handles(idle_handles),
|
||||||
|
_user_ids(user_ids) {
|
||||||
|
this->set_default_callbacks();
|
||||||
|
}
|
||||||
|
|
||||||
|
~AsyncInferQueue() {
|
||||||
|
_requests.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _is_ready() {
|
||||||
|
py::gil_scoped_release release;
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
_cv.wait(lock, [this] {
|
||||||
|
return !(_idle_handles.empty());
|
||||||
|
});
|
||||||
|
|
||||||
|
return !(_idle_handles.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t get_idle_request_id() {
|
||||||
|
// Wait for any of _idle_handles
|
||||||
|
py::gil_scoped_release release;
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
_cv.wait(lock, [this] {
|
||||||
|
return !(_idle_handles.empty());
|
||||||
|
});
|
||||||
|
|
||||||
|
return _idle_handles.front();
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
void wait_all() {
|
||||||
|
// Wait for all requests to return with callback thus updating
|
||||||
|
// _idle_handles so it matches the size of requests
|
||||||
|
py::gil_scoped_release release;
|
||||||
|
std::unique_lock<std::mutex> lock(_mutex);
|
||||||
|
_cv.wait(lock, [this] {
|
||||||
|
return _idle_handles.size() == _requests.size();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void set_default_callbacks() {
|
||||||
|
for (size_t handle = 0; handle < _requests.size(); handle++) {
|
||||||
|
_requests[handle]._request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) {
|
||||||
|
_requests[handle]._end_time = Time::now();
|
||||||
|
// Add idle handle to queue
|
||||||
|
_idle_handles.push(handle);
|
||||||
|
// Notify locks in getIdleRequestId() or waitAll() functions
|
||||||
|
_cv.notify_one();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void set_custom_callbacks(py::function f_callback) {
|
||||||
|
for (size_t handle = 0; handle < _requests.size(); handle++) {
|
||||||
|
_requests[handle]._request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) {
|
||||||
|
_requests[handle]._end_time = Time::now();
|
||||||
|
try {
|
||||||
|
if (exception_ptr) {
|
||||||
|
std::rethrow_exception(exception_ptr);
|
||||||
|
}
|
||||||
|
} catch (const std::exception& e) {
|
||||||
|
throw ov::Exception(e.what());
|
||||||
|
}
|
||||||
|
// Acquire GIL, execute Python function
|
||||||
|
py::gil_scoped_acquire acquire;
|
||||||
|
f_callback(_requests[handle], _user_ids[handle]);
|
||||||
|
// Add idle handle to queue
|
||||||
|
_idle_handles.push(handle);
|
||||||
|
// Notify locks in getIdleRequestId() or waitAll() functions
|
||||||
|
_cv.notify_one();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<InferRequestWrapper> _requests;
|
||||||
|
std::queue<size_t> _idle_handles;
|
||||||
|
std::vector<py::object> _user_ids; // user ID can be any Python object
|
||||||
|
std::mutex _mutex;
|
||||||
|
std::condition_variable _cv;
|
||||||
|
};
|
||||||
|
|
||||||
|
void regclass_AsyncInferQueue(py::module m) {
|
||||||
|
py::class_<AsyncInferQueue, std::shared_ptr<AsyncInferQueue>> cls(m, "AsyncInferQueue");
|
||||||
|
|
||||||
|
cls.def(py::init([](ov::runtime::ExecutableNetwork& net, size_t jobs) {
|
||||||
|
if (jobs == 0) {
|
||||||
|
jobs = (size_t)Common::get_optimal_number_of_requests(net);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<InferRequestWrapper> requests;
|
||||||
|
std::queue<size_t> idle_handles;
|
||||||
|
std::vector<py::object> user_ids(jobs);
|
||||||
|
|
||||||
|
for (size_t handle = 0; handle < jobs; handle++) {
|
||||||
|
auto request = InferRequestWrapper(net.create_infer_request());
|
||||||
|
// Get Inputs and Outputs info from executable network
|
||||||
|
request._inputs = net.inputs();
|
||||||
|
request._outputs = net.outputs();
|
||||||
|
|
||||||
|
requests.push_back(request);
|
||||||
|
idle_handles.push(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new AsyncInferQueue(requests, idle_handles, user_ids);
|
||||||
|
}),
|
||||||
|
py::arg("network"),
|
||||||
|
py::arg("jobs") = 0);
|
||||||
|
|
||||||
|
cls.def(
|
||||||
|
"_start_async",
|
||||||
|
[](AsyncInferQueue& self, const py::dict inputs, py::object userdata) {
|
||||||
|
// getIdleRequestId function has an intention to block InferQueue
|
||||||
|
// until there is at least one idle (free to use) InferRequest
|
||||||
|
auto handle = self.get_idle_request_id();
|
||||||
|
self._idle_handles.pop();
|
||||||
|
// Set new inputs label/id from user
|
||||||
|
self._user_ids[handle] = userdata;
|
||||||
|
// Update inputs if there are any
|
||||||
|
if (!inputs.empty()) {
|
||||||
|
if (py::isinstance<std::string>(inputs.begin()->first)) {
|
||||||
|
auto inputs_map = Common::cast_to_tensor_name_map(inputs);
|
||||||
|
for (auto&& input : inputs_map) {
|
||||||
|
self._requests[handle]._request.set_tensor(input.first, input.second);
|
||||||
|
}
|
||||||
|
} else if (py::isinstance<int>(inputs.begin()->first)) {
|
||||||
|
auto inputs_map = Common::cast_to_tensor_index_map(inputs);
|
||||||
|
for (auto&& input : inputs_map) {
|
||||||
|
self._requests[handle]._request.set_input_tensor(input.first, input.second);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Now GIL can be released - we are NOT working with Python objects in this block
|
||||||
|
{
|
||||||
|
py::gil_scoped_release release;
|
||||||
|
self._requests[handle]._start_time = Time::now();
|
||||||
|
// Start InferRequest in asynchronus mode
|
||||||
|
self._requests[handle]._request.start_async();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
py::arg("inputs"),
|
||||||
|
py::arg("userdata"));
|
||||||
|
|
||||||
|
cls.def("is_ready", [](AsyncInferQueue& self) {
|
||||||
|
return self._is_ready();
|
||||||
|
});
|
||||||
|
|
||||||
|
cls.def("wait_all", [](AsyncInferQueue& self) {
|
||||||
|
return self.wait_all();
|
||||||
|
});
|
||||||
|
|
||||||
|
cls.def("get_idle_request_id", [](AsyncInferQueue& self) {
|
||||||
|
return self.get_idle_request_id();
|
||||||
|
});
|
||||||
|
|
||||||
|
cls.def("set_callback", [](AsyncInferQueue& self, py::function f_callback) {
|
||||||
|
self.set_custom_callbacks(f_callback);
|
||||||
|
});
|
||||||
|
|
||||||
|
cls.def("__len__", [](AsyncInferQueue& self) {
|
||||||
|
return self._requests.size();
|
||||||
|
});
|
||||||
|
|
||||||
|
cls.def(
|
||||||
|
"__iter__",
|
||||||
|
[](AsyncInferQueue& self) {
|
||||||
|
return py::make_iterator(self._requests.begin(), self._requests.end());
|
||||||
|
},
|
||||||
|
py::keep_alive<0, 1>()); /* Keep set alive while iterator is used */
|
||||||
|
|
||||||
|
cls.def("__getitem__", [](AsyncInferQueue& self, size_t i) {
|
||||||
|
return self._requests[i];
|
||||||
|
});
|
||||||
|
|
||||||
|
cls.def_property_readonly("userdata", [](AsyncInferQueue& self) {
|
||||||
|
return self._user_ids;
|
||||||
|
});
|
||||||
|
}
|
@ -7,4 +7,4 @@
|
|||||||
|
|
||||||
namespace py = pybind11;
|
namespace py = pybind11;
|
||||||
|
|
||||||
void regclass_InferQueue(py::module m);
|
void regclass_AsyncInferQueue(py::module m);
|
@ -321,13 +321,13 @@ void set_request_blobs(InferenceEngine::InferRequest& request, const py::dict& d
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t get_optimal_number_of_requests(const InferenceEngine::ExecutableNetwork& actual) {
|
uint32_t get_optimal_number_of_requests(const ov::runtime::ExecutableNetwork& actual) {
|
||||||
try {
|
try {
|
||||||
auto parameter_value = actual.GetMetric(METRIC_KEY(SUPPORTED_METRICS));
|
auto parameter_value = actual.get_metric(METRIC_KEY(SUPPORTED_METRICS));
|
||||||
auto supported_metrics = parameter_value.as<std::vector<std::string>>();
|
auto supported_metrics = parameter_value.as<std::vector<std::string>>();
|
||||||
const std::string key = METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS);
|
const std::string key = METRIC_KEY(OPTIMAL_NUMBER_OF_INFER_REQUESTS);
|
||||||
if (std::find(supported_metrics.begin(), supported_metrics.end(), key) != supported_metrics.end()) {
|
if (std::find(supported_metrics.begin(), supported_metrics.end(), key) != supported_metrics.end()) {
|
||||||
parameter_value = actual.GetMetric(key);
|
parameter_value = actual.get_metric(key);
|
||||||
if (parameter_value.is<unsigned int>())
|
if (parameter_value.is<unsigned int>())
|
||||||
return parameter_value.as<unsigned int>();
|
return parameter_value.as<unsigned int>();
|
||||||
else
|
else
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include "Python.h"
|
#include "Python.h"
|
||||||
#include "ie_common.h"
|
#include "ie_common.h"
|
||||||
#include "openvino/runtime/tensor.hpp"
|
#include "openvino/runtime/tensor.hpp"
|
||||||
|
#include "openvino/runtime/executable_network.hpp"
|
||||||
#include "pyopenvino/core/containers.hpp"
|
#include "pyopenvino/core/containers.hpp"
|
||||||
|
|
||||||
namespace py = pybind11;
|
namespace py = pybind11;
|
||||||
@ -60,5 +61,5 @@ namespace Common
|
|||||||
|
|
||||||
void set_request_blobs(InferenceEngine::InferRequest& request, const py::dict& dictonary);
|
void set_request_blobs(InferenceEngine::InferRequest& request, const py::dict& dictonary);
|
||||||
|
|
||||||
uint32_t get_optimal_number_of_requests(const InferenceEngine::ExecutableNetwork& actual);
|
uint32_t get_optimal_number_of_requests(const ov::runtime::ExecutableNetwork& actual);
|
||||||
}; // namespace Common
|
}; // namespace Common
|
||||||
|
@ -1,228 +0,0 @@
|
|||||||
// Copyright (C) 2021 Intel Corporation
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
|
|
||||||
#include "pyopenvino/core/ie_infer_queue.hpp"
|
|
||||||
|
|
||||||
#include <ie_common.h>
|
|
||||||
#include <pybind11/functional.h>
|
|
||||||
#include <pybind11/stl.h>
|
|
||||||
|
|
||||||
#include <chrono>
|
|
||||||
#include <condition_variable>
|
|
||||||
#include <cpp/ie_executable_network.hpp>
|
|
||||||
#include <cpp/ie_infer_request.hpp>
|
|
||||||
#include <ie_iinfer_request.hpp>
|
|
||||||
#include <mutex>
|
|
||||||
#include <queue>
|
|
||||||
#include <string>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
#include "pyopenvino/core/common.hpp"
|
|
||||||
#include "pyopenvino/core/infer_request.hpp"
|
|
||||||
|
|
||||||
#define INVALID_ID -1
|
|
||||||
|
|
||||||
namespace py = pybind11;
|
|
||||||
|
|
||||||
class InferQueue {
|
|
||||||
public:
|
|
||||||
InferQueue(std::vector<InferRequestWrapper> requests,
|
|
||||||
std::queue<size_t> idle_handles,
|
|
||||||
std::vector<py::object> user_ids)
|
|
||||||
: _requests(requests),
|
|
||||||
_idle_handles(idle_handles),
|
|
||||||
_user_ids(user_ids) {
|
|
||||||
this->setDefaultCallbacks();
|
|
||||||
_last_id = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
~InferQueue() {
|
|
||||||
_requests.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool _is_ready() {
|
|
||||||
py::gil_scoped_release release;
|
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
|
||||||
_cv.wait(lock, [this] {
|
|
||||||
return !(_idle_handles.empty());
|
|
||||||
});
|
|
||||||
|
|
||||||
return !(_idle_handles.empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
py::dict _getIdleRequestInfo() {
|
|
||||||
py::gil_scoped_release release;
|
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
|
||||||
_cv.wait(lock, [this] {
|
|
||||||
return !(_idle_handles.empty());
|
|
||||||
});
|
|
||||||
|
|
||||||
size_t request_id = _idle_handles.front();
|
|
||||||
|
|
||||||
py::dict request_info = py::dict();
|
|
||||||
request_info["id"] = request_id;
|
|
||||||
// request_info["status"] = true; // TODO
|
|
||||||
|
|
||||||
return request_info;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t getIdleRequestId() {
|
|
||||||
// Wait for any of _idle_handles
|
|
||||||
py::gil_scoped_release release;
|
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
|
||||||
_cv.wait(lock, [this] {
|
|
||||||
return !(_idle_handles.empty());
|
|
||||||
});
|
|
||||||
|
|
||||||
size_t idle_request_id = _idle_handles.front();
|
|
||||||
_idle_handles.pop();
|
|
||||||
|
|
||||||
return idle_request_id;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<bool> waitAll() {
|
|
||||||
// Wait for all requests to return with callback thus updating
|
|
||||||
// _idle_handles so it matches the size of requests
|
|
||||||
py::gil_scoped_release release;
|
|
||||||
std::unique_lock<std::mutex> lock(_mutex);
|
|
||||||
_cv.wait(lock, [this] {
|
|
||||||
return _idle_handles.size() == _requests.size();
|
|
||||||
});
|
|
||||||
|
|
||||||
std::vector<bool> statuses;
|
|
||||||
|
|
||||||
for (size_t handle = 0; handle < _requests.size(); handle++) {
|
|
||||||
statuses.push_back(_requests[handle]._request.wait_for(std::chrono::milliseconds(0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
return statuses;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setDefaultCallbacks() {
|
|
||||||
for (size_t handle = 0; handle < _requests.size(); handle++) {
|
|
||||||
_requests[handle]._request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) {
|
|
||||||
_requests[handle]._end_time = Time::now();
|
|
||||||
// Add idle handle to queue
|
|
||||||
_idle_handles.push(handle);
|
|
||||||
// Notify locks in getIdleRequestId() or waitAll() functions
|
|
||||||
_cv.notify_one();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void setCustomCallbacks(py::function f_callback) {
|
|
||||||
for (size_t handle = 0; handle < _requests.size(); handle++) {
|
|
||||||
_requests[handle]._request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) {
|
|
||||||
_requests[handle]._end_time = Time::now();
|
|
||||||
try {
|
|
||||||
if (exception_ptr) {
|
|
||||||
std::rethrow_exception(exception_ptr);
|
|
||||||
}
|
|
||||||
} catch (const std::exception& e) {
|
|
||||||
IE_THROW() << "Caught exception: " << e.what();
|
|
||||||
}
|
|
||||||
// Acquire GIL, execute Python function
|
|
||||||
py::gil_scoped_acquire acquire;
|
|
||||||
f_callback(_requests[handle], _user_ids[handle]);
|
|
||||||
// Add idle handle to queue
|
|
||||||
_idle_handles.push(handle);
|
|
||||||
// Notify locks in getIdleRequestId() or waitAll() functions
|
|
||||||
_cv.notify_one();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<InferRequestWrapper> _requests;
|
|
||||||
std::queue<size_t> _idle_handles;
|
|
||||||
std::vector<py::object> _user_ids; // user ID can be any Python object
|
|
||||||
size_t _last_id;
|
|
||||||
std::mutex _mutex;
|
|
||||||
std::condition_variable _cv;
|
|
||||||
};
|
|
||||||
|
|
||||||
// void regclass_InferQueue(py::module m) {
|
|
||||||
// py::class_<InferQueue, std::shared_ptr<InferQueue>> cls(m, "InferQueue");
|
|
||||||
|
|
||||||
// cls.def(py::init([](InferenceEngine::ExecutableNetwork& net, size_t jobs) {
|
|
||||||
// if (jobs == 0) {
|
|
||||||
// const InferenceEngine::ExecutableNetwork& _net = net;
|
|
||||||
// jobs = (size_t)Common::get_optimal_number_of_requests(_net);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// std::vector<InferRequestWrapper> requests;
|
|
||||||
// std::queue<size_t> idle_handles;
|
|
||||||
// std::vector<py::object> user_ids(jobs);
|
|
||||||
|
|
||||||
// for (size_t handle = 0; handle < jobs; handle++) {
|
|
||||||
// auto request = InferRequestWrapper(net.CreateInferRequest());
|
|
||||||
// // Get Inputs and Outputs info from executable network
|
|
||||||
// request._inputsInfo = net.GetInputsInfo();
|
|
||||||
// request._outputsInfo = net.GetOutputsInfo();
|
|
||||||
|
|
||||||
// requests.push_back(request);
|
|
||||||
// idle_handles.push(handle);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return new InferQueue(requests, idle_handles, user_ids);
|
|
||||||
// }),
|
|
||||||
// py::arg("network"),
|
|
||||||
// py::arg("jobs") = 0);
|
|
||||||
|
|
||||||
// cls.def(
|
|
||||||
// "_async_infer",
|
|
||||||
// [](InferQueue& self, const py::dict inputs, py::object userdata) {
|
|
||||||
// // getIdleRequestId function has an intention to block InferQueue
|
|
||||||
// // until there is at least one idle (free to use) InferRequest
|
|
||||||
// auto handle = self.getIdleRequestId();
|
|
||||||
// // Set new inputs label/id from user
|
|
||||||
// self._user_ids[handle] = userdata;
|
|
||||||
// // Update inputs of picked InferRequest
|
|
||||||
// if (!inputs.empty()) {
|
|
||||||
// Common::set_request_blobs(self._requests[handle]._request, inputs);
|
|
||||||
// }
|
|
||||||
// // Now GIL can be released - we are NOT working with Python objects in this block
|
|
||||||
// {
|
|
||||||
// py::gil_scoped_release release;
|
|
||||||
// self._requests[handle]._start_time = Time::now();
|
|
||||||
// // Start InferRequest in asynchronus mode
|
|
||||||
// self._requests[handle]._request.start_async();
|
|
||||||
// }
|
|
||||||
// },
|
|
||||||
// py::arg("inputs"),
|
|
||||||
// py::arg("userdata"));
|
|
||||||
|
|
||||||
// cls.def("is_ready", [](InferQueue& self) {
|
|
||||||
// return self._is_ready();
|
|
||||||
// });
|
|
||||||
|
|
||||||
// cls.def("wait_all", [](InferQueue& self) {
|
|
||||||
// return self.waitAll();
|
|
||||||
// });
|
|
||||||
|
|
||||||
// cls.def("get_idle_request_info", [](InferQueue& self) {
|
|
||||||
// return self._getIdleRequestInfo();
|
|
||||||
// });
|
|
||||||
|
|
||||||
// cls.def("set_infer_callback", [](InferQueue& self, py::function f_callback) {
|
|
||||||
// self.setCustomCallbacks(f_callback);
|
|
||||||
// });
|
|
||||||
|
|
||||||
// cls.def("__len__", [](InferQueue& self) {
|
|
||||||
// return self._requests.size();
|
|
||||||
// });
|
|
||||||
|
|
||||||
// cls.def(
|
|
||||||
// "__iter__",
|
|
||||||
// [](InferQueue& self) {
|
|
||||||
// return py::make_iterator(self._requests.begin(), self._requests.end());
|
|
||||||
// },
|
|
||||||
// py::keep_alive<0, 1>()); /* Keep set alive while iterator is used */
|
|
||||||
|
|
||||||
// cls.def("__getitem__", [](InferQueue& self, size_t i) {
|
|
||||||
// return self._requests[i];
|
|
||||||
// });
|
|
||||||
|
|
||||||
// cls.def_property_readonly("userdata", [](InferQueue& self) {
|
|
||||||
// return self._user_ids;
|
|
||||||
// });
|
|
||||||
// }
|
|
@ -20,11 +20,15 @@ public:
|
|||||||
InferRequestWrapper(ov::runtime::InferRequest request)
|
InferRequestWrapper(ov::runtime::InferRequest request)
|
||||||
: _request(request)
|
: _request(request)
|
||||||
{
|
{
|
||||||
|
// AsyncInferQueue uses this constructor - setting callback for computing a latency will be done there
|
||||||
}
|
}
|
||||||
|
|
||||||
InferRequestWrapper(ov::runtime::InferRequest request, const std::vector<ov::Output<const ov::Node>>& inputs, const std::vector<ov::Output<const ov::Node>>& outputs)
|
InferRequestWrapper(ov::runtime::InferRequest request, const std::vector<ov::Output<const ov::Node>>& inputs, const std::vector<ov::Output<const ov::Node>>& outputs)
|
||||||
: _request(request), _inputs(inputs), _outputs(outputs)
|
: _request(request), _inputs(inputs), _outputs(outputs)
|
||||||
{
|
{
|
||||||
|
_request.set_callback([this](std::exception_ptr exception_ptr) {
|
||||||
|
_end_time = Time::now();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
// ~InferRequestWrapper() = default;
|
// ~InferRequestWrapper() = default;
|
||||||
|
|
||||||
|
@ -19,12 +19,12 @@
|
|||||||
#if defined(NGRAPH_ONNX_FRONTEND_ENABLE)
|
#if defined(NGRAPH_ONNX_FRONTEND_ENABLE)
|
||||||
# include "pyopenvino/graph/onnx_import/onnx_import.hpp"
|
# include "pyopenvino/graph/onnx_import/onnx_import.hpp"
|
||||||
#endif
|
#endif
|
||||||
|
#include "pyopenvino/core/async_infer_queue.hpp"
|
||||||
#include "pyopenvino/core/containers.hpp"
|
#include "pyopenvino/core/containers.hpp"
|
||||||
#include "pyopenvino/core/core.hpp"
|
#include "pyopenvino/core/core.hpp"
|
||||||
#include "pyopenvino/core/executable_network.hpp"
|
#include "pyopenvino/core/executable_network.hpp"
|
||||||
#include "pyopenvino/core/ie_blob.hpp"
|
#include "pyopenvino/core/ie_blob.hpp"
|
||||||
#include "pyopenvino/core/ie_data.hpp"
|
#include "pyopenvino/core/ie_data.hpp"
|
||||||
#include "pyopenvino/core/ie_infer_queue.hpp"
|
|
||||||
#include "pyopenvino/core/ie_input_info.hpp"
|
#include "pyopenvino/core/ie_input_info.hpp"
|
||||||
#include "pyopenvino/core/ie_network.hpp"
|
#include "pyopenvino/core/ie_network.hpp"
|
||||||
#include "pyopenvino/core/ie_parameter.hpp"
|
#include "pyopenvino/core/ie_parameter.hpp"
|
||||||
@ -127,7 +127,7 @@ PYBIND11_MODULE(pyopenvino, m) {
|
|||||||
regclass_Version(m);
|
regclass_Version(m);
|
||||||
regclass_Parameter(m);
|
regclass_Parameter(m);
|
||||||
regclass_InputInfo(m);
|
regclass_InputInfo(m);
|
||||||
// regclass_InferQueue(m);
|
regclass_AsyncInferQueue(m);
|
||||||
regclass_ProfilingInfo(m);
|
regclass_ProfilingInfo(m);
|
||||||
regclass_PreProcessInfo(m);
|
regclass_PreProcessInfo(m);
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@ import datetime
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from ..conftest import image_path, model_path
|
from ..conftest import image_path, model_path
|
||||||
from openvino import Core, Tensor, ProfilingInfo
|
from openvino import Core, AsyncInferQueue, Tensor, ProfilingInfo
|
||||||
|
|
||||||
is_myriad = os.environ.get("TEST_DEVICE") == "MYRIAD"
|
is_myriad = os.environ.get("TEST_DEVICE") == "MYRIAD"
|
||||||
test_net_xml, test_net_bin = model_path(is_myriad)
|
test_net_xml, test_net_bin = model_path(is_myriad)
|
||||||
@ -35,6 +35,7 @@ def test_get_profiling_info(device):
|
|||||||
img = read_image()
|
img = read_image()
|
||||||
request = exec_net.create_infer_request()
|
request = exec_net.create_infer_request()
|
||||||
request.infer({0: img})
|
request.infer({0: img})
|
||||||
|
assert request.latency > 0
|
||||||
prof_info = request.get_profiling_info()
|
prof_info = request.get_profiling_info()
|
||||||
soft_max_node = next(node for node in prof_info if node.node_name == "fc_out")
|
soft_max_node = next(node for node in prof_info if node.node_name == "fc_out")
|
||||||
assert soft_max_node.node_type == "Softmax"
|
assert soft_max_node.node_type == "Softmax"
|
||||||
@ -168,6 +169,7 @@ def test_start_async(device):
|
|||||||
request.start_async({0: img})
|
request.start_async({0: img})
|
||||||
for request in requests:
|
for request in requests:
|
||||||
request.wait()
|
request.wait()
|
||||||
|
assert request.latency > 0
|
||||||
assert callbacks_info["finished"] == jobs
|
assert callbacks_info["finished"] == jobs
|
||||||
|
|
||||||
|
|
||||||
@ -187,3 +189,26 @@ def test_infer_mixed_keys(device):
|
|||||||
with pytest.raises(TypeError) as e:
|
with pytest.raises(TypeError) as e:
|
||||||
request.infer({0: tensor, "fc_out": tensor2})
|
request.infer({0: tensor, "fc_out": tensor2})
|
||||||
assert "incompatible function arguments!" in str(e.value)
|
assert "incompatible function arguments!" in str(e.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_infer_queue(device):
|
||||||
|
jobs = 8
|
||||||
|
num_request = 4
|
||||||
|
core = Core()
|
||||||
|
func = core.read_model(test_net_xml, test_net_bin)
|
||||||
|
exec_net = core.compile_model(func, device)
|
||||||
|
infer_queue = AsyncInferQueue(exec_net, num_request)
|
||||||
|
jobs_done = [{"finished": False, "latency": 0} for _ in range(jobs)]
|
||||||
|
|
||||||
|
def callback(request, job_id):
|
||||||
|
jobs_done[job_id]["finished"] = True
|
||||||
|
jobs_done[job_id]["latency"] = request.latency
|
||||||
|
|
||||||
|
img = read_image()
|
||||||
|
infer_queue.set_callback(callback)
|
||||||
|
assert infer_queue.is_ready
|
||||||
|
for i in range(jobs):
|
||||||
|
infer_queue.start_async({"data": img}, i)
|
||||||
|
infer_queue.wait_all()
|
||||||
|
assert all(job["finished"] for job in jobs_done)
|
||||||
|
assert all(job["latency"] > 0 for job in jobs_done)
|
||||||
|
Loading…
Reference in New Issue
Block a user