[Core] fix Memory Leak caused by create/inference request consequently in separate thread (#18868)

* try to fix memory issue

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* save code

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* fix life cycle issue

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* update comment and save stream of master thread in tbb ThreadLocal

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* update

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* not save the main stream in tbb

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* remote test code and update comment

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* fix mistaken modify

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* fix format issue

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* add test

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* improve test

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>

* fix the test

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* remote unused code

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* update the comment of the code

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* fix format issue

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

* revert test case

Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>

---------

Signed-off-by: HU Yuan2 <yuan2.hu@intel.com>
Signed-off-by: Hu Yuan2 <yuan2.hu@intel.com>
This commit is contained in:
Yuan Hu 2023-08-11 15:53:07 +08:00 committed by GitHub
parent 0a32ec0e76
commit 85609d4881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,4 +1,4 @@
// Copyright (C) 2018-2023 Intel Corporation
// Copyright (C) 2018-2023 Intel Corporation
// SPDX-License-Identifier: Apache-2.0
//
@ -8,6 +8,7 @@
#include <memory>
#include <mutex>
#include <queue>
#include <set>
#include <thread>
#include <vector>
@ -21,6 +22,8 @@
namespace ov {
namespace threading {
// maybe there are two CPUStreamsExecutors in the same thread.
thread_local std::map<void*, std::shared_ptr<std::thread::id>> t_stream_count_map;
struct CPUStreamsExecutor::Impl {
struct Stream {
#if OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO
@ -310,12 +313,71 @@ struct CPUStreamsExecutor::Impl {
std::vector<int> _cpu_ids;
#endif
};
// if the thread is created by CPUStreamsExecutor, the Impl::Stream of the thread is stored by tbb Class
// enumerable_thread_specific, the alias is ThreadLocal, the limitations of ThreadLocal please refer to
// https://spec.oneapi.io/versions/latest/elements/oneTBB/source/thread_local_storage/enumerable_thread_specific_cls.html
// if the thread is created by customer, the Impl::Stream of the thread will be stored in variable _stream_map, and
// will be counted by thread_local t_stream_count_map.
// when the customer's thread is destoryed, the stream's count will became 1,
// Call local() will reuse one of them, and release others.
class CustomThreadLocal : public ThreadLocal<std::shared_ptr<Stream>> {
public:
CustomThreadLocal(std::function<std::shared_ptr<Stream>()> callback_construct, Impl* impl)
: ThreadLocal<std::shared_ptr<Stream>>(callback_construct),
_impl(impl) {}
std::shared_ptr<Stream> local() {
auto id = std::this_thread::get_id();
auto search = _thread_ids.find(id);
if (search != _thread_ids.end()) {
return ThreadLocal<std::shared_ptr<Stream>>::local();
}
std::lock_guard<std::mutex> guard(_stream_map_mutex);
for (auto& item : _stream_map) {
if (*(item.first.get()) == id) {
t_stream_count_map[(void*)this] = item.first;
return item.second;
}
}
std::shared_ptr<Impl::Stream> stream = nullptr;
for (auto it = _stream_map.begin(); it != _stream_map.end();) {
if (it->first.use_count() == 1) {
if (stream == nullptr) {
stream = it->second;
}
_stream_map.erase(it++);
} else {
it++;
}
}
if (stream == nullptr) {
stream = std::make_shared<Impl::Stream>(_impl);
}
auto id_ptr = std::make_shared<std::thread::id>(id);
t_stream_count_map[(void*)this] = id_ptr;
_stream_map[id_ptr] = stream;
return stream;
}
void set_thread_ids_map(std::vector<std::thread>& threads) {
for (auto& thread : threads) {
_thread_ids.insert(thread.get_id());
}
}
private:
std::set<std::thread::id> _thread_ids;
Impl* _impl;
std::map<std::shared_ptr<std::thread::id>, std::shared_ptr<Impl::Stream>> _stream_map;
std::mutex _stream_map_mutex;
};
explicit Impl(const Config& config)
: _config{config},
_streams([this] {
return std::make_shared<Impl::Stream>(this);
}) {
_streams(
[this] {
return std::make_shared<Impl::Stream>(this);
},
this) {
_exectorMgr = executor_manager();
auto numaNodes = get_available_numa_nodes();
if (_config._streams != 0) {
@ -376,6 +438,7 @@ struct CPUStreamsExecutor::Impl {
}
});
}
_streams.set_thread_ids_map(_threads);
}
void Enqueue(Task task) {
@ -425,7 +488,7 @@ struct CPUStreamsExecutor::Impl {
std::queue<Task> _taskQueue;
bool _isStopped = false;
std::vector<int> _usedNumaNodes;
ThreadLocal<std::shared_ptr<Stream>> _streams;
CustomThreadLocal _streams;
#if (OV_THREAD == OV_THREAD_TBB || OV_THREAD == OV_THREAD_TBB_AUTO)
// stream id mapping to the core type
// stored in the reversed order (so the big cores, with the highest core_type_id value, are populated first)