From b0b540aeafd9aa2c33d0dc15119d9350d5fa3783 Mon Sep 17 00:00:00 2001 From: Evgenya Stepyreva Date: Fri, 26 May 2023 22:09:35 +0400 Subject: [PATCH] Auto-Batch: fix data race (#17752) * Auto-Batch clone model which is being changed * Comments adressed * Style * The fix * Final change --- src/inference/src/check_network_batchable.cpp | 28 ++++++++++++-- src/inference/src/check_network_batchable.hpp | 14 +++++-- src/inference/src/dev/core_impl.cpp | 33 ++++++++-------- src/inference/src/dev/core_impl.hpp | 2 +- src/inference/tests/unit/core.cpp | 38 +++++++++++++++++++ 5 files changed, 92 insertions(+), 23 deletions(-) diff --git a/src/inference/src/check_network_batchable.cpp b/src/inference/src/check_network_batchable.cpp index 5e60a58d7ce..207549356f9 100644 --- a/src/inference/src/check_network_batchable.cpp +++ b/src/inference/src/check_network_batchable.cpp @@ -11,6 +11,24 @@ namespace ov { namespace details { +namespace { +bool model_has_suitable_do(const std::shared_ptr& model) { + bool bDetectionOutput = false; + for (auto& result_node : model->get_results()) { + auto do_node = result_node->input_value(0).get_node_shared_ptr(); + std::shared_ptr convert_node; + if (ov::is_type(do_node)) { // cases with do->convert->result + convert_node = do_node; + do_node = convert_node->get_input_node_shared_ptr(0); + } + auto detectionOutputBase = std::dynamic_pointer_cast(do_node); + if (detectionOutputBase) { + bDetectionOutput = true; + } + } + return bDetectionOutput; +} +} // namespace NetworkBatchAbility is_model_batchable(const std::shared_ptr& model, const std::string& deviceNameWithoutBatch, @@ -48,11 +66,16 @@ NetworkBatchAbility is_model_batchable(const std::shared_ptr& m if (!any_batched_inputs) return NetworkBatchAbility::NO; + return model_has_suitable_do(model) ? NetworkBatchAbility::WITH_HETERO : NetworkBatchAbility::AS_IS; +} + +std::shared_ptr apply_batch_affinity(const std::shared_ptr& model_, + const std::string& deviceNameWithoutBatch) { + auto model = model_->clone(); for (auto&& node : model->get_ops()) node->get_rt_info()["affinity"] = "BATCH"; // default affinity (ignored if HETERO is not triggered) // have to execute the DetectionOutput separately (without batching) // as this layer does mix-in the values from the different inputs (batch id) - bool bDetectionOutput = false; for (auto& result_node : model->get_results()) { auto do_node = result_node->input_value(0).get_node_shared_ptr(); std::shared_ptr convert_node; @@ -68,10 +91,9 @@ NetworkBatchAbility is_model_batchable(const std::shared_ptr& m do_node->get_rt_info()["affinity"] = deviceNameWithoutBatch; if (convert_node) convert_node->get_rt_info()["affinity"] = deviceNameWithoutBatch; - bDetectionOutput = true; } } - return bDetectionOutput ? NetworkBatchAbility::WITH_HETERO : NetworkBatchAbility::AS_IS; + return model; } } // namespace details diff --git a/src/inference/src/check_network_batchable.hpp b/src/inference/src/check_network_batchable.hpp index ce7181a5079..cf21cfa1636 100644 --- a/src/inference/src/check_network_batchable.hpp +++ b/src/inference/src/check_network_batchable.hpp @@ -10,14 +10,22 @@ namespace ov { namespace details { /** - * @brief Checks if the input network is batch-able (e.g. no dynamic inputs, inputs has the batch dimension, etc) - * @param function A ngraph function to check for automatic-batching applicability - * @return An enum value indicating whether the network can be safely batched (with HETERO or as is) or not + * @brief Checks if the input model is batch-able (e.g. no dynamic inputs, inputs has the batch dimension, etc) + * @param model A model to check for automatic-batching applicability + * @return An enum value indicating whether the model can be safely batched (with HETERO or as is) or not */ enum class NetworkBatchAbility : uint32_t { NO = 0, AS_IS, WITH_HETERO }; NetworkBatchAbility is_model_batchable(const std::shared_ptr& model, const std::string& deviceNoBatch, bool strictly_track_dims); +/** + * @brief Sets BATCH affinity for all the nodes except DetectionOutput + * @param model_ A model to set affinity to + * @param deviceNameWithoutBatch Device name to set for DetectionOutput node if any + * @return A copy of the model with set affinity + */ +std::shared_ptr apply_batch_affinity(const std::shared_ptr& model_, + const std::string& deviceNameWithoutBatch); } // namespace details } // namespace ov diff --git a/src/inference/src/dev/core_impl.cpp b/src/inference/src/dev/core_impl.cpp index b9dda935678..c8a8bdd5c2f 100644 --- a/src/inference/src/dev/core_impl.cpp +++ b/src/inference/src/dev/core_impl.cpp @@ -536,14 +536,14 @@ ov::Plugin ov::CoreImpl::get_plugin(const std::string& pluginName) const { } } -ov::SoPtr ov::CoreImpl::compile_model(const std::shared_ptr& model, +ov::SoPtr ov::CoreImpl::compile_model(const std::shared_ptr& model_, const std::string& device_name, const ov::AnyMap& config) const { OV_ITT_SCOPE(FIRST_INFERENCE, ie::itt::domains::IE_LT, "Core::compile_model::model"); std::string deviceName = device_name; ov::AnyMap config_with_batch = config; // if auto-batching is applicable, the below function will patch the device name and config accordingly: - apply_auto_batching(model, deviceName, config_with_batch); + auto model = apply_auto_batching(model_, deviceName, config_with_batch); auto parsed = parseDeviceNameIntoConfig(deviceName, config_with_batch); auto plugin = get_plugin(parsed._deviceName); @@ -562,7 +562,7 @@ ov::SoPtr ov::CoreImpl::compile_model(const std::shared_ptr< return res; } -ov::SoPtr ov::CoreImpl::compile_model(const std::shared_ptr& model, +ov::SoPtr ov::CoreImpl::compile_model(const std::shared_ptr& model_, const ov::RemoteContext& context, const ov::AnyMap& config) const { OV_ITT_SCOPE(FIRST_INFERENCE, ie::itt::domains::IE_LT, "Core::compile_model::RemoteContext"); @@ -572,7 +572,7 @@ ov::SoPtr ov::CoreImpl::compile_model(const std::shared_ptr< std::string deviceName = context.get_device_name(); ov::AnyMap config_with_batch = config; // if auto-batching is applicable, the below function will patch the device name and config accordingly: - apply_auto_batching(model, deviceName, config_with_batch); + auto model = apply_auto_batching(model_, deviceName, config_with_batch); auto parsed = parseDeviceNameIntoConfig(deviceName, config_with_batch); auto plugin = get_plugin(parsed._deviceName); @@ -734,13 +734,13 @@ ov::AnyMap ov::CoreImpl::get_supported_property(const std::string& full_device_n // Considerations: // 1. in case of virtual devices all the magic will happen on the level when // virtual device calls ICore::get_supported_property for real HW devices - // so, for now we can returns user properties almost as is without any + // so, for now we can return user properties almost as is without any // filtering / flattening // 2. The only exception here: while common properties like ov::num::streams or // ov::hint::performance_mode are shared across all the devices, the // ov::device::priority cannot be shared, because it's specific for current virtual // plugin. So, we need to remove ov::device::priorities from the list, because it's - // supposed to be set for current virtual plugin and cannot be propogated down + // supposed to be set for current virtual plugin and cannot be propagated down ov::AnyMap return_properties = user_properties; auto device_priorities_it = return_properties.find(ov::device::priorities.name()); if (device_priorities_it != return_properties.end()) { @@ -806,9 +806,9 @@ ov::RemoteContext ov::CoreImpl::get_default_context(const std::string& device_na return get_plugin(parsed._deviceName).get_default_context(parsed._config); } -void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& model, - std::string& deviceName, - ov::AnyMap& config) const { +std::shared_ptr ov::CoreImpl::apply_auto_batching(const std::shared_ptr& model, + std::string& deviceName, + ov::AnyMap& config) const { std::string deviceNameWithBatchSize, deviceNameWithoutBatch; // fully strict dims tracking by default (Auto-Batching is enabled implicitly) bool strictly_check_dims = true; @@ -816,7 +816,7 @@ void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& m // explicitly enabled Auto-Batching auto pos = deviceName.find_first_of(":"); if (pos == std::string::npos) - return; // BATCH device is already configured via the config + return model; // BATCH device is already configured via the config deviceNameWithBatchSize = deviceName.substr(pos + 1); deviceNameWithoutBatch = ov::DeviceIDParser::get_batch_device(deviceNameWithBatchSize); // when user sets the BATCH device explicitly, we may check the dims less strictly @@ -827,7 +827,7 @@ void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& m try { get_plugin("BATCH"); } catch (const std::runtime_error&) { - return; + return model; } // check whether the Auto-Batching is disabled explicitly @@ -835,12 +835,12 @@ void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& m if (batch_mode != config.end()) { const auto disabled = batch_mode->second.as() == CONFIG_VALUE(NO); // virtual plugins like AUTO/MULTI will need the config - // e.g to deduce the #requests correctly + // e.g. to deduce the #requests correctly // otherwise, no need for this config key in the rest of loading if (!is_virtual_device(deviceName)) config.erase(batch_mode); if (disabled) - return; + return model; } // check whether if the Auto-Batching is applicable to the device @@ -851,7 +851,7 @@ void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& m .as>(); auto it = std::find(metrics.begin(), metrics.end(), METRIC_KEY(OPTIMAL_BATCH_SIZE)); if (metrics.end() == it) - return; + return model; // if applicable, the Auto-Batching is implicitly enabled via the performance hints bool bTputInPlg = @@ -861,13 +861,13 @@ void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& m const auto& excl = config.find(CONFIG_KEY(EXCLUSIVE_ASYNC_REQUESTS)); bool bExclReqsEnabled = (excl != config.end() && excl->second.as() == CONFIG_VALUE(YES)); if (bExclReqsEnabled || (!bTputInPlg && !bTputInLoadCfg)) - return; + return model; } auto batchConfig = deviceNameWithBatchSize.empty() ? deviceNameWithoutBatch : deviceNameWithBatchSize; auto res = ov::details::is_model_batchable(model, deviceNameWithoutBatch, strictly_check_dims); switch (res) { case ov::details::NetworkBatchAbility::NO: - return; + return model; case ov::details::NetworkBatchAbility::AS_IS: deviceName = "BATCH:" + batchConfig; break; @@ -876,6 +876,7 @@ void ov::CoreImpl::apply_auto_batching(const std::shared_ptr& m config[CONFIG_KEY(AUTO_BATCH_DEVICE_CONFIG)] = batchConfig; break; } + return ov::details::apply_batch_affinity(model, deviceNameWithoutBatch); } void ov::CoreImpl::set_property(const std::string& device_name, const AnyMap& properties) { diff --git a/src/inference/src/dev/core_impl.hpp b/src/inference/src/dev/core_impl.hpp index e59aceb4a26..d68380a3b6f 100644 --- a/src/inference/src/dev/core_impl.hpp +++ b/src/inference/src/dev/core_impl.hpp @@ -218,7 +218,7 @@ public: */ void register_plugins_in_registry(const std::string& xml_config_file, const bool& by_abs_path = false); - void apply_auto_batching(const std::shared_ptr& model, + std::shared_ptr apply_auto_batching(const std::shared_ptr& model, std::string& deviceName, ov::AnyMap& config) const; diff --git a/src/inference/tests/unit/core.cpp b/src/inference/tests/unit/core.cpp index 673661cbf18..5ebff30ec7e 100644 --- a/src/inference/tests/unit/core.cpp +++ b/src/inference/tests/unit/core.cpp @@ -7,11 +7,13 @@ #include #include +#include #include "common_test_utils/file_utils.hpp" #include "common_test_utils/test_assertions.hpp" #include "dev/core_impl.hpp" #include "file_utils.h" +#include "openvino/op/relu.hpp" #include "openvino/util/file_util.hpp" using namespace testing; @@ -381,3 +383,39 @@ TEST(CoreTests_parse_device_config, get_device_config) { ov::AnyMap{ov::device::priorities("MULTI,DEVICE"), ov::device::properties(ov::AnyMap{{"MULTI", ov::AnyMap{ov::device::priorities("DEVICE")}}})}); } + +class ApplyAutoBatchThreading : public testing::Test { +public: + static void runParallel(std::function func, + const unsigned int iterations = 50, + const unsigned int threadsNum = 24) { + std::vector threads(threadsNum); + for (auto& thread : threads) { + thread = std::thread([&]() { + for (unsigned int i = 0; i < iterations; ++i) { + func(); + } + }); + } + for (auto& thread : threads) { + if (thread.joinable()) + thread.join(); + } + } +}; + +// Tested function: apply_auto_batch +TEST_F(ApplyAutoBatchThreading, ApplyAutoBatch) { + ov::CoreImpl core(true); + auto input = std::make_shared(ov::element::f32, ov::PartialShape{1, 2, 3, 4}); + ov::Output intermediate = input->output(0); + for (size_t i = 0; i < 100; ++i) + intermediate = std::make_shared(input)->output(0); + auto output = std::make_shared(intermediate); + auto model = std::make_shared(ov::ResultVector{output}, ov::ParameterVector{input}); + std::string device = "GPU"; + ov::AnyMap config; + runParallel([&]() { + core.apply_auto_batching(model, device, config); + }); +}