From 84a0994ec598aec8dc4c1622d40666a6e7af8c0d Mon Sep 17 00:00:00 2001 From: Yuan Hu Date: Tue, 24 Oct 2023 13:59:08 +0800 Subject: [PATCH] [core] fix memory leak issue imported by #18868 (#19832) * try to fix memory leak issue cpustreamer is released, but there are still thread id in t_stream_count_map * fix threadlocal affect all threads Signed-off-by: HU Yuan2 * add comment for local() function to avoid mistaken modification in the future Signed-off-by: HU Yuan2 * use custom stread id Signed-off-by: HU Yuan2 * fix review comments Signed-off-by: HU Yuan2 * fix format issue Signed-off-by: HU Yuan2 * create shared_ptr before assert Signed-off-by: HU Yuan2 --------- Signed-off-by: HU Yuan2 --- .../dev/threading/cpu_streams_executor.cpp | 63 ++++++++++++++++--- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/src/inference/src/dev/threading/cpu_streams_executor.cpp b/src/inference/src/dev/threading/cpu_streams_executor.cpp index e61893e132d..691a3951615 100644 --- a/src/inference/src/dev/threading/cpu_streams_executor.cpp +++ b/src/inference/src/dev/threading/cpu_streams_executor.cpp @@ -4,6 +4,7 @@ #include "openvino/runtime/threading/cpu_streams_executor.hpp" +#include #include #include #include @@ -22,8 +23,6 @@ namespace ov { namespace threading { -// maybe there are two CPUStreamsExecutors in the same thread. -thread_local std::map> t_stream_count_map; struct CPUStreamsExecutor::Impl { struct Stream { #if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO @@ -335,12 +334,58 @@ struct CPUStreamsExecutor::Impl { // will be counted by thread_local t_stream_count_map. // when the customer's thread is destoryed, the stream's count will became 1, // Call local() will reuse one of them, and release others. + // it's only a workaround for ticket CVS-111490, please be carefully when need to modify + // CustomeThreadLocal::local(), especially like operations that will affect the count of + // CustomThreadLocal::ThreadId class CustomThreadLocal : public ThreadLocal> { + class ThreadTracker { + public: + explicit ThreadTracker(const std::thread::id& id) + : _id(id), + _count_ptr(std::make_shared(1)) {} + ~ThreadTracker() { + _count_ptr->fetch_sub(1); + } + std::shared_ptr fetch() { + auto new_ptr = std::shared_ptr(new ThreadTracker(*this)); + auto pre_valule = new_ptr.get()->_count_ptr->fetch_add(1); + OPENVINO_ASSERT(pre_valule == 1, "this value must be 1, please check code CustomThreadLocal::local()"); + return new_ptr; + } + const std::thread::id& get_id() const { + return _id; + } + int count() const { + return *(_count_ptr.get()); + } + + private: + // disable all copy and move semantics, user only can use fetch() + // to create a new instance with a shared count num; + ThreadTracker(ThreadTracker const&) = default; + ThreadTracker(ThreadTracker&&) = delete; + ThreadTracker& operator=(ThreadTracker const&) = delete; + ThreadTracker& operator=(ThreadTracker&&) = delete; + std::thread::id _id; + std::shared_ptr _count_ptr; + }; + public: CustomThreadLocal(std::function()> callback_construct, Impl* impl) : ThreadLocal>(callback_construct), _impl(impl) {} std::shared_ptr local() { + // maybe there are two CPUStreamsExecutors in the same thread. + static thread_local std::map> t_stream_count_map; + // fix the memory leak issue that CPUStreamsExecutor is already released, + // but still exists CustomThreadLocal::ThreadTracker in t_stream_count_map + for (auto it = t_stream_count_map.begin(); it != t_stream_count_map.end();) { + if (this != it->first && it->second->count() == 1) { + t_stream_count_map.erase(it++); + } else { + it++; + } + } auto id = std::this_thread::get_id(); auto search = _thread_ids.find(id); if (search != _thread_ids.end()) { @@ -348,14 +393,13 @@ struct CPUStreamsExecutor::Impl { } std::lock_guard guard(_stream_map_mutex); for (auto& item : _stream_map) { - if (*(item.first.get()) == id) { - t_stream_count_map[(void*)this] = item.first; + if (item.first->get_id() == id) { return item.second; } } std::shared_ptr stream = nullptr; for (auto it = _stream_map.begin(); it != _stream_map.end();) { - if (it->first.use_count() == 1) { + if (it->first->count() == 1) { if (stream == nullptr) { stream = it->second; } @@ -367,9 +411,10 @@ struct CPUStreamsExecutor::Impl { if (stream == nullptr) { stream = std::make_shared(_impl); } - auto id_ptr = std::make_shared(id); - t_stream_count_map[(void*)this] = id_ptr; - _stream_map[id_ptr] = stream; + auto tracker_ptr = std::make_shared(id); + t_stream_count_map[(void*)this] = tracker_ptr; + auto new_tracker_ptr = tracker_ptr->fetch(); + _stream_map[new_tracker_ptr] = stream; return stream; } @@ -382,7 +427,7 @@ struct CPUStreamsExecutor::Impl { private: std::set _thread_ids; Impl* _impl; - std::map, std::shared_ptr> _stream_map; + std::map, std::shared_ptr> _stream_map; std::mutex _stream_map_mutex; };