TypeRelaxed<>::clone_with_new_inputs thread safety fix (#16881)
* TypeRelaxed<>::clone_with_new_inputs thread safety fix * Style * Make TypeRelaxed<BaseOp>::clone_with_new_inputs copy node the same way as copy ctor of ov::Node * Removed mutex field from intel_cpu::GraphContext * Removed all about has_type_relaxed_ops field from the snippets subgraph * Clonning test
This commit is contained in:
parent
83cc2277b4
commit
b452dab8f0
@ -99,7 +99,6 @@ public:
|
||||
size_t get_virtual_port_count() const { return m_virtual_port_count; }
|
||||
bool is_buffer_needed() const { return m_buffer_needed; }
|
||||
bool is_quantized() const { return config.m_is_quantized; }
|
||||
bool has_type_relaxed_ops() const { return config.m_has_type_relaxed_ops; }
|
||||
bool has_domain_sensitive_ops() const { return config.m_has_domain_sensitive_ops; }
|
||||
snippets::Schedule generate(const BlockedShapeVector& output_shapes,
|
||||
const BlockedShapeVector& input_shapes,
|
||||
@ -169,9 +168,6 @@ private:
|
||||
public:
|
||||
// True if Subgraph contains FakeQuantize -> FQ decomposition should be called
|
||||
bool m_is_quantized = false;
|
||||
// True if Subgraph contains TypeRelaxed nodes -> for several streams in tp mode we should copy body using mutexes
|
||||
// because TypeRelaxed::copy_with_new_inputs() isn't save-thread method
|
||||
bool m_has_type_relaxed_ops = false;
|
||||
// True if body has operations that don't support plugin-side domain optimizations
|
||||
// (e.g. Transpose, Softmax, MatMul in general doesn't support dimensions collapsing)
|
||||
bool m_has_domain_sensitive_ops = false;
|
||||
|
@ -60,8 +60,6 @@ void snippets::op::Subgraph::init_config() {
|
||||
for (const auto& op : ops) {
|
||||
config.m_is_quantized = config.m_is_quantized ||
|
||||
ov::is_type<ov::op::v0::FakeQuantize>(op);
|
||||
config.m_has_type_relaxed_ops = config.m_has_type_relaxed_ops ||
|
||||
std::dynamic_pointer_cast<ov::op::TypeRelaxedBase>(op);
|
||||
config.m_has_domain_sensitive_ops = config.m_has_domain_sensitive_ops ||
|
||||
ov::is_type<ov::op::v1::Transpose>(op) ||
|
||||
ov::is_type<ov::op::v1::Softmax>(op) ||
|
||||
|
@ -232,7 +232,6 @@ public:
|
||||
bool visit_attributes(AttributeVisitor& visitor) override;
|
||||
|
||||
private:
|
||||
mutable std::mutex type_relax_mutex;
|
||||
void init() {
|
||||
validate_and_infer_types();
|
||||
}
|
||||
@ -351,14 +350,28 @@ void TypeRelaxed<BaseOp>::validate_and_infer_types() {
|
||||
|
||||
template <typename BaseOp>
|
||||
std::shared_ptr<Node> TypeRelaxed<BaseOp>::clone_with_new_inputs(const OutputVector& new_args) const {
|
||||
std::lock_guard<std::mutex> lock(type_relax_mutex);
|
||||
// copy then modify inputs
|
||||
// thread safety: we protect inputs source output objects -- clone original op with fake parameters
|
||||
OutputVector fake_new_inputs;
|
||||
for (size_t i = 0; i < BaseOp::get_input_size(); ++i) {
|
||||
auto origin_input_type = get_origin_input_type(i);
|
||||
if (origin_input_type == element::undefined)
|
||||
origin_input_type = BaseOp::get_input_element_type(i);
|
||||
fake_new_inputs.push_back(
|
||||
std::make_shared<v0::Parameter>(origin_input_type, BaseOp::get_input_partial_shape(i)));
|
||||
}
|
||||
auto base_op = BaseOp::clone_with_new_inputs(fake_new_inputs);
|
||||
// since originally TypeRelaxed was copying everything from the original node, we continue doing the same
|
||||
auto curr_base_op = BaseOp::shared_from_this();
|
||||
base_op->add_node_control_dependents(curr_base_op);
|
||||
base_op->add_node_control_dependencies(curr_base_op);
|
||||
base_op->set_friendly_name(BaseOp::get_friendly_name());
|
||||
base_op->get_rt_info() = {curr_base_op->get_rt_info()};
|
||||
|
||||
std::shared_ptr<Node> new_node =
|
||||
std::make_shared<TypeRelaxed<BaseOp>>((BaseOp&)(*this), m_input_data_types, m_output_data_types);
|
||||
std::make_shared<TypeRelaxed<BaseOp>>((BaseOp&)(*base_op), m_input_data_types, m_output_data_types);
|
||||
for (size_t i = 0; i < new_node->get_input_size(); ++i) {
|
||||
new_node->input(i).replace_source_output(new_args[i]);
|
||||
}
|
||||
|
||||
new_node->validate_and_infer_types();
|
||||
return new_node;
|
||||
}
|
||||
|
@ -320,10 +320,10 @@ public:
|
||||
void clear_control_dependents();
|
||||
|
||||
/// This node absorbs the control dependencies of source_node
|
||||
void add_node_control_dependencies(std::shared_ptr<Node> source_node);
|
||||
void add_node_control_dependencies(const std::shared_ptr<const Node>& source_node);
|
||||
|
||||
/// This node becomes a dependent of every node dependent on source_node
|
||||
void add_node_control_dependents(std::shared_ptr<Node> source_node);
|
||||
void add_node_control_dependents(const std::shared_ptr<const Node>& source_node);
|
||||
|
||||
/// This node's control dependencies are replaced by replacement
|
||||
void transfer_control_dependents(std::shared_ptr<Node> replacement);
|
||||
|
@ -325,13 +325,13 @@ void ov::Node::add_control_dependency(std::shared_ptr<Node> node) {
|
||||
});
|
||||
}
|
||||
|
||||
void ov::Node::add_node_control_dependencies(std::shared_ptr<Node> source_node) {
|
||||
void ov::Node::add_node_control_dependencies(const std::shared_ptr<const Node>& source_node) {
|
||||
for (auto& node : source_node->get_control_dependencies()) {
|
||||
add_control_dependency(node);
|
||||
}
|
||||
}
|
||||
|
||||
void ov::Node::add_node_control_dependents(std::shared_ptr<Node> source_node) {
|
||||
void ov::Node::add_node_control_dependents(const std::shared_ptr<const Node>& source_node) {
|
||||
for (Node* node : source_node->get_control_dependents()) {
|
||||
node->add_control_dependency(shared_from_this());
|
||||
}
|
||||
|
53
src/core/tests/type_relaxed_copy.cpp
Normal file
53
src/core/tests/type_relaxed_copy.cpp
Normal file
@ -0,0 +1,53 @@
|
||||
// Copyright (C) 2023 Intel Corporation
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "ie_common.h"
|
||||
#include "ngraph_functions/builders.hpp"
|
||||
#include "ov_ops/type_relaxed.hpp"
|
||||
|
||||
using namespace ov;
|
||||
|
||||
class TypeRelaxedThreading : public testing::Test {
|
||||
public:
|
||||
static void runParallel(std::function<void(void)> func,
|
||||
const unsigned int iterations = 100,
|
||||
const unsigned int threadsNum = 24) {
|
||||
std::vector<std::thread> threads(threadsNum);
|
||||
for (auto& thread : threads) {
|
||||
thread = std::thread([&]() {
|
||||
for (unsigned int i = 0; i < iterations; ++i) {
|
||||
func();
|
||||
}
|
||||
});
|
||||
}
|
||||
for (auto& thread : threads) {
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(TypeRelaxedThreading, TypeRelaxedCloning) {
|
||||
auto inp1_f32 = std::make_shared<op::v0::Parameter>(element::f32, PartialShape{-1, -1, -1, -1});
|
||||
auto inp2_f32 = std::make_shared<op::v0::Parameter>(element::f32, PartialShape{-1, -1, -1, -1});
|
||||
|
||||
auto inp1 = std::make_shared<op::v0::Parameter>(element::i8, PartialShape{-1, -1, -1, -1});
|
||||
auto inp2 = std::make_shared<op::v0::Parameter>(element::i8, PartialShape{-1, -1, -1, -1});
|
||||
|
||||
auto matMulRelaxed = std::make_shared<ov::op::TypeRelaxed<ngraph::opset3::MatMul>>(
|
||||
*as_type_ptr<ngraph::opset3::MatMul>(ngraph::builder::makeMatMul(inp1_f32, inp2_f32, false, false)),
|
||||
element::f32);
|
||||
auto matMul = matMulRelaxed->clone_with_new_inputs({inp1, inp2});
|
||||
|
||||
runParallel([&]() {
|
||||
auto inp3 = std::make_shared<op::v0::Parameter>(element::i8, PartialShape{-1, -1, -1, -1});
|
||||
auto inp4 = std::make_shared<op::v0::Parameter>(element::i8, PartialShape{-1, -1, -1, -1});
|
||||
auto copied_matMul = matMulRelaxed->clone_with_new_inputs({inp3, inp4});
|
||||
});
|
||||
}
|
@ -192,7 +192,7 @@ ExecNetwork::GraphGuard::Lock ExecNetwork::GetGraph() const {
|
||||
(_cfg.lpTransformsMode == Config::On) &&
|
||||
ngraph::pass::low_precision::LowPrecision::isFunctionQuantized(_network.getFunction());
|
||||
|
||||
ctx = std::make_shared<GraphContext>(_cfg, extensionManager, weightsCache, _mutex, isQuantizedFlag);
|
||||
ctx = std::make_shared<GraphContext>(_cfg, extensionManager, weightsCache, isQuantizedFlag);
|
||||
}
|
||||
graphLock._graph.CreateGraph(_network, ctx);
|
||||
} catch (...) {
|
||||
|
@ -21,12 +21,10 @@ public:
|
||||
GraphContext(const Config& config,
|
||||
ExtensionManager::Ptr extensionManager,
|
||||
WeightsSharing::Ptr w_cache,
|
||||
std::shared_ptr<std::mutex> sharedMutex,
|
||||
bool isGraphQuantized)
|
||||
: config(config),
|
||||
extensionManager(extensionManager),
|
||||
weightsCache(w_cache),
|
||||
sharedMutex(sharedMutex),
|
||||
isGraphQuantizedFlag(isGraphQuantized) {
|
||||
rtParamsCache = std::make_shared<MultiCache>(config.rtCacheCapacity);
|
||||
rtScratchPad = std::make_shared<DnnlScratchPad>(eng);
|
||||
@ -44,9 +42,6 @@ public:
|
||||
return weightsCache;
|
||||
}
|
||||
|
||||
std::shared_ptr<std::mutex> getSharedMutex() const {
|
||||
return sharedMutex;
|
||||
}
|
||||
|
||||
MultiCachePtr getParamsCache() const {
|
||||
return rtParamsCache;
|
||||
@ -69,7 +64,6 @@ private:
|
||||
|
||||
ExtensionManager::Ptr extensionManager;
|
||||
WeightsSharing::Ptr weightsCache; // per NUMA node caches for sharing weights data
|
||||
std::shared_ptr<std::mutex> sharedMutex; // mutex for protection of type-relaxed Op in clone_model()
|
||||
|
||||
MultiCachePtr rtParamsCache; // primitive cache
|
||||
DnnlScratchPadPtr rtScratchPad; // scratch pad
|
||||
|
@ -89,14 +89,7 @@ void Snippet::copy_snippet() {
|
||||
auto new_input = std::make_shared<ngraph::opset1::Parameter>(input.get_element_type(), input.get_partial_shape());
|
||||
subgraph_node_inputs.push_back(new_input);
|
||||
}
|
||||
std::shared_ptr<ov::Model> new_body = nullptr;
|
||||
// Ticket[79554]: TypeRelaxed ops aren't thread safe so we use mutex to avoid collision in throughput mode
|
||||
if (original_snippet->has_type_relaxed_ops()) {
|
||||
std::lock_guard<std::mutex> lock(*context->getSharedMutex());
|
||||
new_body = original_snippet->body_ptr()->clone();
|
||||
} else {
|
||||
new_body = original_snippet->body_ptr()->clone();
|
||||
}
|
||||
std::shared_ptr<ov::Model> new_body = original_snippet->body_ptr()->clone();
|
||||
snippet = std::make_shared<ngraph::snippets::op::Subgraph>(subgraph_node_inputs, new_body);
|
||||
ngraph::copy_runtime_info(original_snippet, snippet);
|
||||
snippet->set_friendly_name(original_snippet->get_friendly_name());
|
||||
|
@ -720,7 +720,7 @@ QueryNetworkResult Engine::QueryNetwork(const CNNNetwork& network, const std::ma
|
||||
}
|
||||
|
||||
auto context =
|
||||
std::make_shared<GraphContext>(conf, extensionManager, fake_w_cache, std::make_shared<std::mutex>(), false);
|
||||
std::make_shared<GraphContext>(conf, extensionManager, fake_w_cache, false);
|
||||
|
||||
auto supported = GetSupportedNodes(model,
|
||||
[&](std::shared_ptr<ov::Model>& model) {
|
||||
|
@ -113,7 +113,6 @@ public:
|
||||
auto context = std::make_shared<GraphContext>(conf,
|
||||
nullptr,
|
||||
std::make_shared<WeightsSharing>(),
|
||||
std::make_shared<std::mutex>(),
|
||||
false);
|
||||
const dnnl::engine cpuEngine = context->getEngine();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user