[core] fix memory leak issue imported by #18868 (#19832)

* try to fix memory leak issue

cpustreamer is released, but there are still thread id in t_stream_count_map

* fix threadlocal affect all threads

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* add comment for local() function to avoid mistaken modification
in the future

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* use custom stread id

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* fix review comments

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* fix format issue

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* create shared_ptr before assert

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

---------

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>
This commit is contained in:
Yuan Hu 2023-10-24 13:59:08 +08:00 committed by GitHub
parent afda7ad70f
commit 84a0994ec5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,6 +4,7 @@
#include "openvino/runtime/threading/cpu_streams_executor.hpp" #include "openvino/runtime/threading/cpu_streams_executor.hpp"
#include <atomic>
#include <condition_variable> #include <condition_variable>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -22,8 +23,6 @@
namespace ov { namespace ov {
namespace threading { namespace threading {
// maybe there are two CPUStreamsExecutors in the same thread.
thread_local std::map<void*, std::shared_ptr<std::thread::id>> t_stream_count_map;
struct CPUStreamsExecutor::Impl { struct CPUStreamsExecutor::Impl {
struct Stream { struct Stream {
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO #if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
@ -335,12 +334,58 @@ struct CPUStreamsExecutor::Impl {
// will be counted by thread_local t_stream_count_map. // will be counted by thread_local t_stream_count_map.
// when the customer's thread is destoryed, the stream's count will became 1, // when the customer's thread is destoryed, the stream's count will became 1,
// Call local() will reuse one of them, and release others. // Call local() will reuse one of them, and release others.
// it's only a workaround for ticket CVS-111490, please be carefully when need to modify
// CustomeThreadLocal::local(), especially like operations that will affect the count of
// CustomThreadLocal::ThreadId
class CustomThreadLocal : public ThreadLocal<std::shared_ptr<Stream>> { class CustomThreadLocal : public ThreadLocal<std::shared_ptr<Stream>> {
class ThreadTracker {
public:
explicit ThreadTracker(const std::thread::id& id)
: _id(id),
_count_ptr(std::make_shared<std::atomic_int>(1)) {}
~ThreadTracker() {
_count_ptr->fetch_sub(1);
}
std::shared_ptr<ThreadTracker> fetch() {
auto new_ptr = std::shared_ptr<ThreadTracker>(new ThreadTracker(*this));
auto pre_valule = new_ptr.get()->_count_ptr->fetch_add(1);
OPENVINO_ASSERT(pre_valule == 1, "this value must be 1, please check code CustomThreadLocal::local()");
return new_ptr;
}
const std::thread::id& get_id() const {
return _id;
}
int count() const {
return *(_count_ptr.get());
}
private:
// disable all copy and move semantics, user only can use fetch()
// to create a new instance with a shared count num;
ThreadTracker(ThreadTracker const&) = default;
ThreadTracker(ThreadTracker&&) = delete;
ThreadTracker& operator=(ThreadTracker const&) = delete;
ThreadTracker& operator=(ThreadTracker&&) = delete;
std::thread::id _id;
std::shared_ptr<std::atomic_int> _count_ptr;
};
public: public:
CustomThreadLocal(std::function<std::shared_ptr<Stream>()> callback_construct, Impl* impl) CustomThreadLocal(std::function<std::shared_ptr<Stream>()> callback_construct, Impl* impl)
: ThreadLocal<std::shared_ptr<Stream>>(callback_construct), : ThreadLocal<std::shared_ptr<Stream>>(callback_construct),
_impl(impl) {} _impl(impl) {}
std::shared_ptr<Stream> local() { std::shared_ptr<Stream> local() {
// maybe there are two CPUStreamsExecutors in the same thread.
static thread_local std::map<void*, std::shared_ptr<CustomThreadLocal::ThreadTracker>> t_stream_count_map;
// fix the memory leak issue that CPUStreamsExecutor is already released,
// but still exists CustomThreadLocal::ThreadTracker in t_stream_count_map
for (auto it = t_stream_count_map.begin(); it != t_stream_count_map.end();) {
if (this != it->first && it->second->count() == 1) {
t_stream_count_map.erase(it++);
} else {
it++;
}
}
auto id = std::this_thread::get_id(); auto id = std::this_thread::get_id();
auto search = _thread_ids.find(id); auto search = _thread_ids.find(id);
if (search != _thread_ids.end()) { if (search != _thread_ids.end()) {
@ -348,14 +393,13 @@ struct CPUStreamsExecutor::Impl {
} }
std::lock_guard<std::mutex> guard(_stream_map_mutex); std::lock_guard<std::mutex> guard(_stream_map_mutex);
for (auto& item : _stream_map) { for (auto& item : _stream_map) {
if (*(item.first.get()) == id) { if (item.first->get_id() == id) {
t_stream_count_map[(void*)this] = item.first;
return item.second; return item.second;
} }
} }
std::shared_ptr<Impl::Stream> stream = nullptr; std::shared_ptr<Impl::Stream> stream = nullptr;
for (auto it = _stream_map.begin(); it != _stream_map.end();) { for (auto it = _stream_map.begin(); it != _stream_map.end();) {
if (it->first.use_count() == 1) { if (it->first->count() == 1) {
if (stream == nullptr) { if (stream == nullptr) {
stream = it->second; stream = it->second;
} }
@ -367,9 +411,10 @@ struct CPUStreamsExecutor::Impl {
if (stream == nullptr) { if (stream == nullptr) {
stream = std::make_shared<Impl::Stream>(_impl); stream = std::make_shared<Impl::Stream>(_impl);
} }
auto id_ptr = std::make_shared<std::thread::id>(id); auto tracker_ptr = std::make_shared<CustomThreadLocal::ThreadTracker>(id);
t_stream_count_map[(void*)this] = id_ptr; t_stream_count_map[(void*)this] = tracker_ptr;
_stream_map[id_ptr] = stream; auto new_tracker_ptr = tracker_ptr->fetch();
_stream_map[new_tracker_ptr] = stream;
return stream; return stream;
} }
@ -382,7 +427,7 @@ struct CPUStreamsExecutor::Impl {
private: private:
std::set<std::thread::id> _thread_ids; std::set<std::thread::id> _thread_ids;
Impl* _impl; Impl* _impl;
std::map<std::shared_ptr<std::thread::id>, std::shared_ptr<Impl::Stream>> _stream_map; std::map<std::shared_ptr<CustomThreadLocal::ThreadTracker>, std::shared_ptr<Impl::Stream>> _stream_map;
std::mutex _stream_map_mutex; std::mutex _stream_map_mutex;
}; };