[PyOV] Refine InferRequestWrapper and AsyncInferQueue (#13512)

This commit is contained in:
Jan Iwaszkiewicz 2022-10-20 21:34:03 +02:00 committed by GitHub
parent ffc74c8fe2
commit 0f0a08cde1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 201 additions and 176 deletions

View File

@ -21,43 +21,53 @@ namespace py = pybind11;
class AsyncInferQueue { class AsyncInferQueue {
public: public:
AsyncInferQueue(std::vector<InferRequestWrapper> requests, AsyncInferQueue(ov::CompiledModel& model, size_t jobs) {
std::queue<size_t> idle_handles, if (jobs == 0) {
std::vector<py::object> user_ids) jobs = static_cast<size_t>(Common::get_optimal_number_of_requests(model));
: _requests(requests), }
_idle_handles(idle_handles),
_user_ids(user_ids) { m_requests.reserve(jobs);
m_user_ids.reserve(jobs);
for (size_t handle = 0; handle < jobs; handle++) {
// Create new "empty" InferRequestWrapper without pre-defined callback and
// copy Inputs and Outputs from ov::CompiledModel
m_requests.emplace_back(model.create_infer_request(), model.inputs(), model.outputs(), false);
m_user_ids.push_back(py::none());
m_idle_handles.push(handle);
}
this->set_default_callbacks(); this->set_default_callbacks();
} }
~AsyncInferQueue() { ~AsyncInferQueue() {
_requests.clear(); m_requests.clear();
} }
bool _is_ready() { bool _is_ready() {
// Check if any request has finished already // Check if any request has finished already
py::gil_scoped_release release; py::gil_scoped_release release;
// acquire the mutex to access _errors and _idle_handles // acquire the mutex to access m_errors and m_idle_handles
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(m_mutex);
if (_errors.size() > 0) if (m_errors.size() > 0)
throw _errors.front(); throw m_errors.front();
return !(_idle_handles.empty()); return !(m_idle_handles.empty());
} }
size_t get_idle_request_id() { size_t get_idle_request_id() {
// Wait for any request to complete and return its id // Wait for any request to complete and return its id
// release GIL to avoid deadlock on python callback // release GIL to avoid deadlock on python callback
py::gil_scoped_release release; py::gil_scoped_release release;
// acquire the mutex to access _errors and _idle_handles // acquire the mutex to access m_errors and m_idle_handles
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(m_mutex);
_cv.wait(lock, [this] { m_cv.wait(lock, [this] {
return !(_idle_handles.empty()); return !(m_idle_handles.empty());
}); });
size_t idle_handle = _idle_handles.front(); size_t idle_handle = m_idle_handles.front();
// wait for request to make sure it returned from callback // wait for request to make sure it returned from callback
_requests[idle_handle]._request.wait(); m_requests[idle_handle].m_request.wait();
if (_errors.size() > 0) if (m_errors.size() > 0)
throw _errors.front(); throw m_errors.front();
return idle_handle; return idle_handle;
} }
@ -65,27 +75,29 @@ public:
// Wait for all request to complete // Wait for all request to complete
// release GIL to avoid deadlock on python callback // release GIL to avoid deadlock on python callback
py::gil_scoped_release release; py::gil_scoped_release release;
for (auto&& request : _requests) { for (auto&& request : m_requests) {
request._request.wait(); request.m_request.wait();
} }
// acquire the mutex to access _errors // acquire the mutex to access m_errors
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(m_mutex);
if (_errors.size() > 0) if (m_errors.size() > 0)
throw _errors.front(); throw m_errors.front();
} }
void set_default_callbacks() { void set_default_callbacks() {
for (size_t handle = 0; handle < _requests.size(); handle++) { for (size_t handle = 0; handle < m_requests.size(); handle++) {
_requests[handle]._request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) { // auto end_time = m_requests[handle].m_end_time; // TODO: pass it bellow? like in InferRequestWrapper
_requests[handle]._end_time = Time::now();
m_requests[handle].m_request.set_callback([this, handle /* ... */](std::exception_ptr exception_ptr) {
*m_requests[handle].m_end_time = Time::now();
{ {
// acquire the mutex to access _idle_handles // acquire the mutex to access m_idle_handles
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(m_mutex);
// Add idle handle to queue // Add idle handle to queue
_idle_handles.push(handle); m_idle_handles.push(handle);
} }
// Notify locks in getIdleRequestId() // Notify locks in getIdleRequestId()
_cv.notify_one(); m_cv.notify_one();
try { try {
if (exception_ptr) { if (exception_ptr) {
@ -99,53 +111,54 @@ public:
} }
void set_custom_callbacks(py::function f_callback) { void set_custom_callbacks(py::function f_callback) {
for (size_t handle = 0; handle < _requests.size(); handle++) { for (size_t handle = 0; handle < m_requests.size(); handle++) {
_requests[handle]._request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) { m_requests[handle].m_request.set_callback([this, f_callback, handle](std::exception_ptr exception_ptr) {
_requests[handle]._end_time = Time::now(); *m_requests[handle].m_end_time = Time::now();
if (exception_ptr == nullptr) { if (exception_ptr == nullptr) {
// Acquire GIL, execute Python function // Acquire GIL, execute Python function
py::gil_scoped_acquire acquire; py::gil_scoped_acquire acquire;
try { try {
f_callback(_requests[handle], _user_ids[handle]); f_callback(m_requests[handle], m_user_ids[handle]);
} catch (const py::error_already_set& py_error) { } catch (const py::error_already_set& py_error) {
// This should behave the same as assert(!PyErr_Occurred()) // This should behave the same as assert(!PyErr_Occurred())
// since constructor for pybind11's error_already_set is // since constructor for pybind11's error_already_set is
// performing PyErr_Fetch which clears error indicator and // performing PyErr_Fetch which clears error indicator and
// saves it inside itself. // saves it inside itself.
assert(py_error.type()); assert(py_error.type());
// acquire the mutex to access _errors // acquire the mutex to access m_errors
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(m_mutex);
_errors.push(py_error); m_errors.push(py_error);
} }
} }
{ {
// acquire the mutex to access _idle_handles // acquire the mutex to access m_idle_handles
std::lock_guard<std::mutex> lock(_mutex); std::lock_guard<std::mutex> lock(m_mutex);
// Add idle handle to queue // Add idle handle to queue
_idle_handles.push(handle); m_idle_handles.push(handle);
} }
// Notify locks in getIdleRequestId() // Notify locks in getIdleRequestId()
_cv.notify_one(); m_cv.notify_one();
try { try {
if (exception_ptr) { if (exception_ptr) {
std::rethrow_exception(exception_ptr); std::rethrow_exception(exception_ptr);
} }
} catch (const std::exception& e) { } catch (const std::exception& e) {
// Notify locks in getIdleRequestId()
throw ov::Exception(e.what()); throw ov::Exception(e.what());
} }
}); });
} }
} }
std::vector<InferRequestWrapper> _requests; // AsyncInferQueue is the owner of all requests. When AsyncInferQueue is destroyed,
std::queue<size_t> _idle_handles; // all of requests are destroyed as well.
std::vector<py::object> _user_ids; // user ID can be any Python object std::vector<InferRequestWrapper> m_requests;
std::mutex _mutex; std::queue<size_t> m_idle_handles;
std::condition_variable _cv; std::vector<py::object> m_user_ids; // user ID can be any Python object
std::queue<py::error_already_set> _errors; std::mutex m_mutex;
std::condition_variable m_cv;
std::queue<py::error_already_set> m_errors;
}; };
void regclass_AsyncInferQueue(py::module m) { void regclass_AsyncInferQueue(py::module m) {
@ -153,25 +166,7 @@ void regclass_AsyncInferQueue(py::module m) {
cls.doc() = "openvino.runtime.AsyncInferQueue represents helper that creates a pool of asynchronous" cls.doc() = "openvino.runtime.AsyncInferQueue represents helper that creates a pool of asynchronous"
"InferRequests and provides synchronization functions to control flow of a simple pipeline."; "InferRequests and provides synchronization functions to control flow of a simple pipeline.";
cls.def(py::init([](ov::CompiledModel& model, size_t jobs) { cls.def(py::init<ov::CompiledModel&, size_t>(),
if (jobs == 0) {
jobs = (size_t)Common::get_optimal_number_of_requests(model);
}
std::vector<InferRequestWrapper> requests;
std::queue<size_t> idle_handles;
std::vector<py::object> user_ids(jobs, py::none());
for (size_t handle = 0; handle < jobs; handle++) {
auto request = InferRequestWrapper(model.create_infer_request());
// Get Inputs and Outputs info from compiled model
request._inputs = model.inputs();
request._outputs = model.outputs();
requests.push_back(request);
idle_handles.push(handle);
}
return new AsyncInferQueue(requests, idle_handles, user_ids);
}),
py::arg("model"), py::arg("model"),
py::arg("jobs") = 0, py::arg("jobs") = 0,
R"( R"(
@ -193,19 +188,19 @@ void regclass_AsyncInferQueue(py::module m) {
// until there is at least one idle (free to use) InferRequest // until there is at least one idle (free to use) InferRequest
auto handle = self.get_idle_request_id(); auto handle = self.get_idle_request_id();
{ {
std::lock_guard<std::mutex> lock(self._mutex); std::lock_guard<std::mutex> lock(self.m_mutex);
self._idle_handles.pop(); self.m_idle_handles.pop();
} }
// Set new inputs label/id from user // Set new inputs label/id from user
self._user_ids[handle] = userdata; self.m_user_ids[handle] = userdata;
// Update inputs if there are any // Update inputs if there are any
self._requests[handle]._request.set_input_tensor(inputs); self.m_requests[handle].m_request.set_input_tensor(inputs);
// Now GIL can be released - we are NOT working with Python objects in this block // Now GIL can be released - we are NOT working with Python objects in this block
{ {
py::gil_scoped_release release; py::gil_scoped_release release;
self._requests[handle]._start_time = Time::now(); *self.m_requests[handle].m_start_time = Time::now();
// Start InferRequest in asynchronus mode // Start InferRequest in asynchronus mode
self._requests[handle]._request.start_async(); self.m_requests[handle].m_request.start_async();
} }
}, },
py::arg("inputs"), py::arg("inputs"),
@ -239,19 +234,19 @@ void regclass_AsyncInferQueue(py::module m) {
// until there is at least one idle (free to use) InferRequest // until there is at least one idle (free to use) InferRequest
auto handle = self.get_idle_request_id(); auto handle = self.get_idle_request_id();
{ {
std::lock_guard<std::mutex> lock(self._mutex); std::lock_guard<std::mutex> lock(self.m_mutex);
self._idle_handles.pop(); self.m_idle_handles.pop();
} }
// Set new inputs label/id from user // Set new inputs label/id from user
self._user_ids[handle] = userdata; self.m_user_ids[handle] = userdata;
// Update inputs if there are any // Update inputs if there are any
Common::set_request_tensors(self._requests[handle]._request, inputs); Common::set_request_tensors(self.m_requests[handle].m_request, inputs);
// Now GIL can be released - we are NOT working with Python objects in this block // Now GIL can be released - we are NOT working with Python objects in this block
{ {
py::gil_scoped_release release; py::gil_scoped_release release;
self._requests[handle]._start_time = Time::now(); *self.m_requests[handle].m_start_time = Time::now();
// Start InferRequest in asynchronus mode // Start InferRequest in asynchronus mode
self._requests[handle]._request.start_async(); self.m_requests[handle].m_request.start_async();
} }
}, },
py::arg("inputs"), py::arg("inputs"),
@ -326,7 +321,7 @@ void regclass_AsyncInferQueue(py::module m) {
cls.def( cls.def(
"__len__", "__len__",
[](AsyncInferQueue& self) { [](AsyncInferQueue& self) {
return self._requests.size(); return self.m_requests.size();
}, },
R"( R"(
Number of InferRequests in the pool. Number of InferRequests in the pool.
@ -337,14 +332,14 @@ void regclass_AsyncInferQueue(py::module m) {
cls.def( cls.def(
"__iter__", "__iter__",
[](AsyncInferQueue& self) { [](AsyncInferQueue& self) {
return py::make_iterator(self._requests.begin(), self._requests.end()); return py::make_iterator(self.m_requests.begin(), self.m_requests.end());
}, },
py::keep_alive<0, 1>()); /* Keep set alive while iterator is used */ py::keep_alive<0, 1>()); /* Keep set alive while iterator is used */
cls.def( cls.def(
"__getitem__", "__getitem__",
[](AsyncInferQueue& self, size_t i) { [](AsyncInferQueue& self, size_t i) {
return self._requests[i]; return self.m_requests[i];
}, },
R"( R"(
:param i: InferRequest id :param i: InferRequest id
@ -356,7 +351,7 @@ void regclass_AsyncInferQueue(py::module m) {
cls.def_property_readonly( cls.def_property_readonly(
"userdata", "userdata",
[](AsyncInferQueue& self) { [](AsyncInferQueue& self) {
return self._user_ids; return self.m_user_ids;
}, },
R"( R"(
:return: List of all passed userdata. List is filled with `None` if the data wasn't passed yet. :return: List of all passed userdata. List is filled with `None` if the data wasn't passed yet.
@ -364,6 +359,6 @@ void regclass_AsyncInferQueue(py::module m) {
)"); )");
cls.def("__repr__", [](const AsyncInferQueue& self) { cls.def("__repr__", [](const AsyncInferQueue& self) {
return "<AsyncInferQueue: " + std::to_string(self._requests.size()) + " jobs>"; return "<AsyncInferQueue: " + std::to_string(self.m_requests.size()) + " jobs>";
}); });
} }

View File

@ -31,6 +31,7 @@ void regclass_CompiledModel(py::module m) {
cls.def( cls.def(
"create_infer_request", "create_infer_request",
[](ov::CompiledModel& self) { [](ov::CompiledModel& self) {
// Create temporary ov::InferRequest and move it to actual wrapper class.
return std::make_shared<InferRequestWrapper>(self.create_infer_request(), self.inputs(), self.outputs()); return std::make_shared<InferRequestWrapper>(self.create_infer_request(), self.inputs(), self.outputs());
}, },
py::call_guard<py::gil_scoped_release>(), py::call_guard<py::gil_scoped_release>(),

View File

@ -21,11 +21,11 @@ namespace py = pybind11;
inline py::dict run_sync_infer(InferRequestWrapper& self) { inline py::dict run_sync_infer(InferRequestWrapper& self) {
{ {
py::gil_scoped_release release; py::gil_scoped_release release;
self._start_time = Time::now(); *self.m_start_time = Time::now();
self._request.infer(); self.m_request.infer();
self._end_time = Time::now(); *self.m_end_time = Time::now();
} }
return Common::outputs_to_dict(self._outputs, self._request); return Common::outputs_to_dict(self.m_outputs, self.m_request);
} }
void regclass_InferRequest(py::module m) { void regclass_InferRequest(py::module m) {
@ -42,7 +42,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_tensors", "set_tensors",
[](InferRequestWrapper& self, const py::dict& inputs) { [](InferRequestWrapper& self, const py::dict& inputs) {
Common::set_request_tensors(self._request, inputs); Common::set_request_tensors(self.m_request, inputs);
}, },
py::arg("inputs"), py::arg("inputs"),
R"( R"(
@ -55,7 +55,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_tensors", "set_tensors",
[](InferRequestWrapper& self, const std::string& tensor_name, const std::vector<ov::Tensor>& tensors) { [](InferRequestWrapper& self, const std::string& tensor_name, const std::vector<ov::Tensor>& tensors) {
self._request.set_tensors(tensor_name, tensors); self.m_request.set_tensors(tensor_name, tensors);
}, },
py::arg("tensor_name"), py::arg("tensor_name"),
py::arg("tensors"), py::arg("tensors"),
@ -77,7 +77,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_tensors", "set_tensors",
[](InferRequestWrapper& self, const ov::Output<const ov::Node>& port, const std::vector<ov::Tensor>& tensors) { [](InferRequestWrapper& self, const ov::Output<const ov::Node>& port, const std::vector<ov::Tensor>& tensors) {
self._request.set_tensors(port, tensors); self.m_request.set_tensors(port, tensors);
}, },
py::arg("port"), py::arg("port"),
py::arg("tensors"), py::arg("tensors"),
@ -104,7 +104,7 @@ void regclass_InferRequest(py::module m) {
[](InferRequestWrapper& self, const py::dict& outputs) { [](InferRequestWrapper& self, const py::dict& outputs) {
auto outputs_map = Common::cast_to_tensor_index_map(outputs); auto outputs_map = Common::cast_to_tensor_index_map(outputs);
for (auto&& output : outputs_map) { for (auto&& output : outputs_map) {
self._request.set_output_tensor(output.first, output.second); self.m_request.set_output_tensor(output.first, output.second);
} }
}, },
py::arg("outputs"), py::arg("outputs"),
@ -121,7 +121,7 @@ void regclass_InferRequest(py::module m) {
[](InferRequestWrapper& self, const py::dict& inputs) { [](InferRequestWrapper& self, const py::dict& inputs) {
auto inputs_map = Common::cast_to_tensor_index_map(inputs); auto inputs_map = Common::cast_to_tensor_index_map(inputs);
for (auto&& input : inputs_map) { for (auto&& input : inputs_map) {
self._request.set_input_tensor(input.first, input.second); self.m_request.set_input_tensor(input.first, input.second);
} }
}, },
py::arg("inputs"), py::arg("inputs"),
@ -135,7 +135,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_input_tensors", "set_input_tensors",
[](InferRequestWrapper& self, const std::vector<ov::Tensor>& tensors) { [](InferRequestWrapper& self, const std::vector<ov::Tensor>& tensors) {
self._request.set_input_tensors(tensors); self.m_request.set_input_tensors(tensors);
}, },
py::arg("tensors"), py::arg("tensors"),
R"( R"(
@ -152,7 +152,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_input_tensors", "set_input_tensors",
[](InferRequestWrapper& self, size_t idx, const std::vector<ov::Tensor>& tensors) { [](InferRequestWrapper& self, size_t idx, const std::vector<ov::Tensor>& tensors) {
self._request.set_input_tensors(idx, tensors); self.m_request.set_input_tensors(idx, tensors);
}, },
py::arg("idx"), py::arg("idx"),
py::arg("tensors"), py::arg("tensors"),
@ -172,7 +172,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"infer", "infer",
[](InferRequestWrapper& self, const ov::Tensor& inputs) { [](InferRequestWrapper& self, const ov::Tensor& inputs) {
self._request.set_input_tensor(inputs); self.m_request.set_input_tensor(inputs);
return run_sync_infer(self); return run_sync_infer(self);
}, },
py::arg("inputs"), py::arg("inputs"),
@ -199,7 +199,7 @@ void regclass_InferRequest(py::module m) {
"infer", "infer",
[](InferRequestWrapper& self, const py::dict& inputs) { [](InferRequestWrapper& self, const py::dict& inputs) {
// Update inputs if there are any // Update inputs if there are any
Common::set_request_tensors(self._request, inputs); Common::set_request_tensors(self.m_request, inputs);
// Call Infer function // Call Infer function
return run_sync_infer(self); return run_sync_infer(self);
}, },
@ -222,17 +222,17 @@ void regclass_InferRequest(py::module m) {
"start_async", "start_async",
[](InferRequestWrapper& self, const ov::Tensor& inputs, py::object& userdata) { [](InferRequestWrapper& self, const ov::Tensor& inputs, py::object& userdata) {
// Update inputs if there are any // Update inputs if there are any
self._request.set_input_tensor(inputs); self.m_request.set_input_tensor(inputs);
if (!userdata.is(py::none())) { if (!userdata.is(py::none())) {
if (self.user_callback_defined) { if (self.m_user_callback_defined) {
self.userdata = userdata; self.m_userdata = userdata;
} else { } else {
PyErr_WarnEx(PyExc_RuntimeWarning, "There is no callback function!", 1); PyErr_WarnEx(PyExc_RuntimeWarning, "There is no callback function to pass `userdata` into!", 1);
} }
} }
py::gil_scoped_release release; py::gil_scoped_release release;
self._start_time = Time::now(); *self.m_start_time = Time::now();
self._request.start_async(); self.m_request.start_async();
}, },
py::arg("inputs"), py::arg("inputs"),
py::arg("userdata"), py::arg("userdata"),
@ -261,17 +261,17 @@ void regclass_InferRequest(py::module m) {
"start_async", "start_async",
[](InferRequestWrapper& self, const py::dict& inputs, py::object& userdata) { [](InferRequestWrapper& self, const py::dict& inputs, py::object& userdata) {
// Update inputs if there are any // Update inputs if there are any
Common::set_request_tensors(self._request, inputs); Common::set_request_tensors(self.m_request, inputs);
if (!userdata.is(py::none())) { if (!userdata.is(py::none())) {
if (self.user_callback_defined) { if (self.m_user_callback_defined) {
self.userdata = userdata; self.m_userdata = userdata;
} else { } else {
PyErr_WarnEx(PyExc_RuntimeWarning, "There is no callback function!", 1); PyErr_WarnEx(PyExc_RuntimeWarning, "There is no callback function!", 1);
} }
} }
py::gil_scoped_release release; py::gil_scoped_release release;
self._start_time = Time::now(); *self.m_start_time = Time::now();
self._request.start_async(); self.m_request.start_async();
}, },
py::arg("inputs"), py::arg("inputs"),
py::arg("userdata"), py::arg("userdata"),
@ -293,7 +293,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"cancel", "cancel",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
self._request.cancel(); self.m_request.cancel();
}, },
R"( R"(
Cancels inference request. Cancels inference request.
@ -303,7 +303,7 @@ void regclass_InferRequest(py::module m) {
"wait", "wait",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
py::gil_scoped_release release; py::gil_scoped_release release;
self._request.wait(); self.m_request.wait();
}, },
R"( R"(
Waits for the result to become available. Waits for the result to become available.
@ -316,7 +316,7 @@ void regclass_InferRequest(py::module m) {
"wait_for", "wait_for",
[](InferRequestWrapper& self, const int timeout) { [](InferRequestWrapper& self, const int timeout) {
py::gil_scoped_release release; py::gil_scoped_release release;
return self._request.wait_for(std::chrono::milliseconds(timeout)); return self.m_request.wait_for(std::chrono::milliseconds(timeout));
}, },
py::arg("timeout"), py::arg("timeout"),
R"( R"(
@ -335,10 +335,10 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_callback", "set_callback",
[](InferRequestWrapper& self, py::function callback, py::object& userdata) { [](InferRequestWrapper& self, py::function callback, py::object& userdata) {
self.userdata = userdata; self.m_userdata = userdata;
self.user_callback_defined = true; self.m_user_callback_defined = true;
self._request.set_callback([&self, callback](std::exception_ptr exception_ptr) { self.m_request.set_callback([&self, callback](std::exception_ptr exception_ptr) {
self._end_time = Time::now(); *self.m_end_time = Time::now();
try { try {
if (exception_ptr) { if (exception_ptr) {
std::rethrow_exception(exception_ptr); std::rethrow_exception(exception_ptr);
@ -348,7 +348,7 @@ void regclass_InferRequest(py::module m) {
} }
// Acquire GIL, execute Python function // Acquire GIL, execute Python function
py::gil_scoped_acquire acquire; py::gil_scoped_acquire acquire;
callback(self.userdata); callback(self.m_userdata);
}); });
}, },
py::arg("callback"), py::arg("callback"),
@ -365,7 +365,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_tensor", "get_tensor",
[](InferRequestWrapper& self, const std::string& name) { [](InferRequestWrapper& self, const std::string& name) {
return self._request.get_tensor(name); return self.m_request.get_tensor(name);
}, },
py::arg("name"), py::arg("name"),
R"( R"(
@ -380,7 +380,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_tensor", "get_tensor",
[](InferRequestWrapper& self, const ov::Output<const ov::Node>& port) { [](InferRequestWrapper& self, const ov::Output<const ov::Node>& port) {
return self._request.get_tensor(port); return self.m_request.get_tensor(port);
}, },
py::arg("port"), py::arg("port"),
R"( R"(
@ -395,7 +395,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_tensor", "get_tensor",
[](InferRequestWrapper& self, const ov::Output<ov::Node>& port) { [](InferRequestWrapper& self, const ov::Output<ov::Node>& port) {
return self._request.get_tensor(port); return self.m_request.get_tensor(port);
}, },
py::arg("port"), py::arg("port"),
R"( R"(
@ -410,7 +410,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_input_tensor", "get_input_tensor",
[](InferRequestWrapper& self, size_t idx) { [](InferRequestWrapper& self, size_t idx) {
return self._request.get_input_tensor(idx); return self.m_request.get_input_tensor(idx);
}, },
py::arg("index"), py::arg("index"),
R"( R"(
@ -427,7 +427,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_input_tensor", "get_input_tensor",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._request.get_input_tensor(); return self.m_request.get_input_tensor();
}, },
R"( R"(
Gets input tensor of InferRequest. Gets input tensor of InferRequest.
@ -440,7 +440,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_output_tensor", "get_output_tensor",
[](InferRequestWrapper& self, size_t idx) { [](InferRequestWrapper& self, size_t idx) {
return self._request.get_output_tensor(idx); return self.m_request.get_output_tensor(idx);
}, },
py::arg("index"), py::arg("index"),
R"( R"(
@ -456,7 +456,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_output_tensor", "get_output_tensor",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._request.get_output_tensor(); return self.m_request.get_output_tensor();
}, },
R"( R"(
Gets output tensor of InferRequest. Gets output tensor of InferRequest.
@ -469,7 +469,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_tensor", "set_tensor",
[](InferRequestWrapper& self, const std::string& name, const ov::Tensor& tensor) { [](InferRequestWrapper& self, const std::string& name, const ov::Tensor& tensor) {
self._request.set_tensor(name, tensor); self.m_request.set_tensor(name, tensor);
}, },
py::arg("name"), py::arg("name"),
py::arg("tensor"), py::arg("tensor"),
@ -486,7 +486,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_tensor", "set_tensor",
[](InferRequestWrapper& self, const ov::Output<const ov::Node>& port, const ov::Tensor& tensor) { [](InferRequestWrapper& self, const ov::Output<const ov::Node>& port, const ov::Tensor& tensor) {
self._request.set_tensor(port, tensor); self.m_request.set_tensor(port, tensor);
}, },
py::arg("port"), py::arg("port"),
py::arg("tensor"), py::arg("tensor"),
@ -503,7 +503,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_tensor", "set_tensor",
[](InferRequestWrapper& self, const ov::Output<ov::Node>& port, const ov::Tensor& tensor) { [](InferRequestWrapper& self, const ov::Output<ov::Node>& port, const ov::Tensor& tensor) {
self._request.set_tensor(port, tensor); self.m_request.set_tensor(port, tensor);
}, },
py::arg("port"), py::arg("port"),
py::arg("tensor"), py::arg("tensor"),
@ -520,7 +520,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_input_tensor", "set_input_tensor",
[](InferRequestWrapper& self, size_t idx, const ov::Tensor& tensor) { [](InferRequestWrapper& self, size_t idx, const ov::Tensor& tensor) {
self._request.set_input_tensor(idx, tensor); self.m_request.set_input_tensor(idx, tensor);
}, },
py::arg("index"), py::arg("index"),
py::arg("tensor"), py::arg("tensor"),
@ -538,7 +538,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_input_tensor", "set_input_tensor",
[](InferRequestWrapper& self, const ov::Tensor& tensor) { [](InferRequestWrapper& self, const ov::Tensor& tensor) {
self._request.set_input_tensor(tensor); self.m_request.set_input_tensor(tensor);
}, },
py::arg("tensor"), py::arg("tensor"),
R"( R"(
@ -553,7 +553,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_output_tensor", "set_output_tensor",
[](InferRequestWrapper& self, size_t idx, const ov::Tensor& tensor) { [](InferRequestWrapper& self, size_t idx, const ov::Tensor& tensor) {
self._request.set_output_tensor(idx, tensor); self.m_request.set_output_tensor(idx, tensor);
}, },
py::arg("index"), py::arg("index"),
py::arg("tensor"), py::arg("tensor"),
@ -570,7 +570,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"set_output_tensor", "set_output_tensor",
[](InferRequestWrapper& self, const ov::Tensor& tensor) { [](InferRequestWrapper& self, const ov::Tensor& tensor) {
self._request.set_output_tensor(tensor); self.m_request.set_output_tensor(tensor);
}, },
py::arg("tensor"), py::arg("tensor"),
R"( R"(
@ -585,7 +585,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"get_profiling_info", "get_profiling_info",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._request.get_profiling_info(); return self.m_request.get_profiling_info();
}, },
py::call_guard<py::gil_scoped_release>(), py::call_guard<py::gil_scoped_release>(),
R"( R"(
@ -602,7 +602,7 @@ void regclass_InferRequest(py::module m) {
cls.def( cls.def(
"query_state", "query_state",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._request.query_state(); return self.m_request.query_state();
}, },
py::call_guard<py::gil_scoped_release>(), py::call_guard<py::gil_scoped_release>(),
R"( R"(
@ -617,7 +617,7 @@ void regclass_InferRequest(py::module m) {
cls.def_property_readonly( cls.def_property_readonly(
"userdata", "userdata",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self.userdata; return self.m_userdata;
}, },
R"( R"(
Gets currently held userdata. Gets currently held userdata.
@ -628,7 +628,7 @@ void regclass_InferRequest(py::module m) {
cls.def_property_readonly( cls.def_property_readonly(
"model_inputs", "model_inputs",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._inputs; return self.m_inputs;
}, },
R"( R"(
Gets all inputs of a compiled model which was used to create this InferRequest. Gets all inputs of a compiled model which was used to create this InferRequest.
@ -639,7 +639,7 @@ void regclass_InferRequest(py::module m) {
cls.def_property_readonly( cls.def_property_readonly(
"model_outputs", "model_outputs",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._outputs; return self.m_outputs;
}, },
R"( R"(
Gets all outputs of a compiled model which was used to create this InferRequest. Gets all outputs of a compiled model which was used to create this InferRequest.
@ -694,7 +694,7 @@ void regclass_InferRequest(py::module m) {
cls.def_property_readonly( cls.def_property_readonly(
"profiling_info", "profiling_info",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return self._request.get_profiling_info(); return self.m_request.get_profiling_info();
}, },
py::call_guard<py::gil_scoped_release>(), py::call_guard<py::gil_scoped_release>(),
R"( R"(
@ -710,7 +710,7 @@ void regclass_InferRequest(py::module m) {
cls.def_property_readonly( cls.def_property_readonly(
"results", "results",
[](InferRequestWrapper& self) { [](InferRequestWrapper& self) {
return Common::outputs_to_dict(self._outputs, self._request); return Common::outputs_to_dict(self.m_outputs, self.m_request);
}, },
R"( R"(
Gets all outputs tensors of this InferRequest. Gets all outputs tensors of this InferRequest.
@ -720,8 +720,8 @@ void regclass_InferRequest(py::module m) {
)"); )");
cls.def("__repr__", [](const InferRequestWrapper& self) { cls.def("__repr__", [](const InferRequestWrapper& self) {
auto inputs_str = Common::docs::container_to_string(self._inputs, ",\n"); auto inputs_str = Common::docs::container_to_string(self.m_inputs, ",\n");
auto outputs_str = Common::docs::container_to_string(self._outputs, ",\n"); auto outputs_str = Common::docs::container_to_string(self.m_outputs, ",\n");
return "<InferRequest:\ninputs[\n" + inputs_str + "\n]\noutputs[\n" + outputs_str + "\n]>"; return "<InferRequest:\ninputs[\n" + inputs_str + "\n]\noutputs[\n" + outputs_str + "\n]>";
}); });

View File

@ -17,17 +17,37 @@ typedef std::chrono::nanoseconds ns;
class InferRequestWrapper { class InferRequestWrapper {
public: public:
InferRequestWrapper(ov::InferRequest request) // InferRequestWrapper is getting original ov::InferRequest as rvalue.
: _request(request) // Ownership of the ov::InferRequest is moved to the wrapper
// while calling upon any of the constructors, so lifetime of
// the object is managed by the wrapper which is exposed to Python.
// AsyncInferQueue uses this specifc constructor as setting callback
// for computing a latency will be done there.
InferRequestWrapper(ov::InferRequest&& request)
: InferRequestWrapper(std::move(request), {}, {}, false)
{ {
// AsyncInferQueue uses this constructor - setting callback for computing a latency will be done there
} }
InferRequestWrapper(ov::InferRequest request, const std::vector<ov::Output<const ov::Node>>& inputs, const std::vector<ov::Output<const ov::Node>>& outputs) InferRequestWrapper(
: _request(request), _inputs(inputs), _outputs(outputs) ov::InferRequest&& request,
{ const std::vector<ov::Output<const ov::Node>>& inputs,
_request.set_callback([this](std::exception_ptr exception_ptr) { const std::vector<ov::Output<const ov::Node>>& outputs,
_end_time = Time::now(); bool set_default_callback = true,
py::object userdata = py::none()
) : m_request{std::move(request)}, m_inputs{inputs}, m_outputs{outputs},
m_userdata{userdata} {
m_start_time = std::make_shared<Time::time_point>(Time::time_point{});
m_end_time = std::make_shared<Time::time_point>(Time::time_point{});
// Initialize InferRequest with default callback
if (set_default_callback) {
// Bump reference counter
auto end_time = m_end_time;
// Set standard callback which saves "end-time" for inference call
m_request.set_callback([end_time](std::exception_ptr exception_ptr) {
*end_time = Time::now();
try { try {
if (exception_ptr) { if (exception_ptr) {
std::rethrow_exception(exception_ptr); std::rethrow_exception(exception_ptr);
@ -37,38 +57,47 @@ public:
} }
}); });
} }
}
// ~InferRequestWrapper() = default; // ~InferRequestWrapper() = default;
std::vector<ov::Tensor> get_input_tensors() { std::vector<ov::Tensor> get_input_tensors() {
std::vector<ov::Tensor> tensors; return get_tensors_from(m_inputs);
for (auto&& node : _inputs) {
tensors.push_back(_request.get_tensor(node));
}
return tensors;
} }
std::vector<ov::Tensor> get_output_tensors() { std::vector<ov::Tensor> get_output_tensors() {
std::vector<ov::Tensor> tensors; return get_tensors_from(m_outputs);
for (auto&& node : _outputs) {
tensors.push_back(_request.get_tensor(node));
} }
return tensors;
}
bool user_callback_defined = false;
py::object userdata;
double get_latency() { double get_latency() {
auto execTime = std::chrono::duration_cast<ns>(_end_time - _start_time); auto execTime = std::chrono::duration_cast<ns>(*m_end_time - *m_start_time);
return static_cast<double>(execTime.count()) * 0.000001; return static_cast<double>(execTime.count()) * 0.000001;
} }
ov::InferRequest _request; // Original ov::InferRequest class that is held by this wrapper
std::vector<ov::Output<const ov::Node>> _inputs; ov::InferRequest m_request;
std::vector<ov::Output<const ov::Node>> _outputs; // Inputs and Outputs inherrited from ov::CompiledModel
std::vector<ov::Output<const ov::Node>> m_inputs;
std::vector<ov::Output<const ov::Node>> m_outputs;
// A flag which is set when a user defines a custom callback on InferRequest
bool m_user_callback_defined = false;
// Data that is passed by user from Python->C++
py::object m_userdata;
// Times of inference's start and finish
std::shared_ptr<Time::time_point> m_start_time; // proposal: change to unique_ptr
std::shared_ptr<Time::time_point> m_end_time;
Time::time_point _start_time; private:
Time::time_point _end_time; inline std::vector<ov::Tensor> get_tensors_from(const std::vector<ov::Output<const ov::Node>>& v) {
std::vector<ov::Tensor> tensors;
tensors.reserve(v.size());
for (auto&& node : v) {
tensors.push_back(m_request.get_tensor(node));
}
return tensors;
}
}; };
void regclass_InferRequest(py::module m); void regclass_InferRequest(py::module m);