Hybrid-aware P/E Core optimization (#13435)

* add HybridAware stream setting and core binding

* fix clang format issue

* unified code style, add parameter check

* correct input affinity skip NUMA, modify function name

* remove unnecessary floor

* fix ci compile issue on Mac/windows platform

* modify smoke_SetConfigAffinity test

* modify ov_core_set_and_get_property_enum test, affinity HYBRID_AWARE is changed to NUMA

* remove affinity correcting on this pr

* revert ov_core_test.cpp

* merge function by comments

* fix code style issue

Co-authored-by: Shen, Wanglei <wanglei.shen@intel.com>
This commit is contained in:
Sun Xiaoxia
2022-10-25 03:26:22 +08:00
committed by GitHub
parent 086bc00d4c
commit ec14dd3523
6 changed files with 119 additions and 25 deletions

View File

@@ -83,6 +83,7 @@ public:
*/ */
static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true); static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true);
static int GetDefaultNumStreams(); // no network specifics considered (only CPU's caps); static int GetDefaultNumStreams(); // no network specifics considered (only CPU's caps);
static int GetHybridNumStreams(const Config& config, const int stream_mode);
std::string _name; //!< Used by `ITT` to name executor threads std::string _name; //!< Used by `ITT` to name executor threads
int _streams = 1; //!< Number of streams. int _streams = 1; //!< Number of streams.
@@ -91,10 +92,16 @@ public:
//!< No binding by default //!< No binding by default
int _threadBindingStep = 1; //!< In case of @ref CORES binding offset type int _threadBindingStep = 1; //!< In case of @ref CORES binding offset type
//!< thread binded to cores with defined step //!< thread binded to cores with defined step
int _threadBindingOffset = 0; //!< In case of @ref CORES binding offset type thread binded to cores int _threadBindingOffset = 0; //!< In case of @ref CORES binding offset type thread binded to cores
//!< starting from offset //!< starting from offset
int _threads = 0; //!< Number of threads distributed between streams. int _threads = 0; //!< Number of threads distributed between streams.
//!< Reserved. Should not be used. //!< Reserved. Should not be used.
mutable int _big_core_streams = 0; // Number of streams in Performance-core(big core)
mutable int _small_core_streams = 0; // Number of streams in Efficient-core(small core)
mutable int _threads_per_stream_big = 0; // Threads per stream in big cores
mutable int _threads_per_stream_small = 0; // Threads per stream in small cores
mutable int _small_core_offset = 0; // Calculate small core start offset when binding cpu cores
enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE };
enum PreferredCoreType { enum PreferredCoreType {
ANY, ANY,
LITTLE, LITTLE,

View File

@@ -34,23 +34,27 @@ struct CPUStreamsExecutor::Impl {
int _ncpus = 0; int _ncpus = 0;
int _threadBindingStep = 0; int _threadBindingStep = 0;
int _offset = 0; int _offset = 0;
int _cpuIdxOffset = 0;
Observer(custom::task_arena& arena, Observer(custom::task_arena& arena,
CpuSet mask, CpuSet mask,
int ncpus, int ncpus,
const int streamId, const int streamId,
const int threadsPerStream, const int threadsPerStream,
const int threadBindingStep, const int threadBindingStep,
const int threadBindingOffset) const int threadBindingOffset,
const int cpuIdxOffset = 0)
: custom::task_scheduler_observer(arena), : custom::task_scheduler_observer(arena),
_mask{std::move(mask)}, _mask{std::move(mask)},
_ncpus(ncpus), _ncpus(ncpus),
_threadBindingStep(threadBindingStep), _threadBindingStep(threadBindingStep),
_offset{streamId * threadsPerStream + threadBindingOffset} {} _offset{streamId * threadsPerStream + threadBindingOffset},
_cpuIdxOffset(cpuIdxOffset) {}
void on_scheduler_entry(bool) override { void on_scheduler_entry(bool) override {
PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(), PinThreadToVacantCore(_offset + tbb::this_task_arena::current_thread_index(),
_threadBindingStep, _threadBindingStep,
_ncpus, _ncpus,
_mask); _mask,
_cpuIdxOffset);
} }
void on_scheduler_exit(bool) override { void on_scheduler_exit(bool) override {
PinCurrentThreadByMask(_ncpus, _mask); PinCurrentThreadByMask(_ncpus, _mask);
@@ -103,9 +107,29 @@ struct CPUStreamsExecutor::Impl {
return p.second > streamId_wrapped; return p.second > streamId_wrapped;
}) })
->first; ->first;
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{} const auto max_concurrency = selected_core_type == 0 ? _impl->_config._threads_per_stream_small
.set_core_type(selected_core_type) : _impl->_config._threads_per_stream_big;
.set_max_concurrency(concurrency)}); const auto stream_id =
selected_core_type == 0 ? _streamId - _impl->_config._big_core_streams : _streamId;
const auto thread_binding_step = selected_core_type == 0 ? _impl->_config._threadBindingStep : 2;
// Prevent conflicts with system scheduling, so default cpu id on big core starts from 1
const auto cpu_idx_offset = selected_core_type == 0 ? _impl->_config._small_core_offset : 1;
_taskArena.reset(new custom::task_arena{max_concurrency});
CpuSet processMask;
int ncpus = 0;
std::tie(processMask, ncpus) = GetProcessMask();
if (nullptr != processMask) {
_observer.reset(new Observer{*_taskArena,
std::move(processMask),
ncpus,
stream_id,
max_concurrency,
thread_binding_step,
_impl->_config._threadBindingOffset,
cpu_idx_offset});
_observer->observe(true);
}
} }
} else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) { } else if (ThreadBindingType::NUMA == _impl->_config._threadBindingType) {
_taskArena.reset(new custom::task_arena{custom::task_arena::constraints{_numaNodeId, concurrency}}); _taskArena.reset(new custom::task_arena{custom::task_arena::constraints{_numaNodeId, concurrency}});
@@ -198,17 +222,13 @@ struct CPUStreamsExecutor::Impl {
#if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO) #if (IE_THREAD == IE_THREAD_TBB || IE_THREAD == IE_THREAD_TBB_AUTO)
if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) { if (ThreadBindingType::HYBRID_AWARE == config._threadBindingType) {
const auto core_types = custom::info::core_types(); const auto core_types = custom::info::core_types();
const int threadsPerStream =
(0 == config._threadsPerStream) ? std::thread::hardware_concurrency() : config._threadsPerStream;
int sum = 0; int sum = 0;
// reversed order, so BIG cores are first // reversed order, so BIG cores are first
for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) { for (auto iter = core_types.rbegin(); iter < core_types.rend(); iter++) {
const auto& type = *iter; const auto& type = *iter;
// calculating the #streams per core type // calculating the #streams per core type
const int num_streams_for_core_type = const int num_streams_for_core_type =
std::max(1, type == 0 ? std::max(1, config._small_core_streams) : std::max(1, config._big_core_streams);
custom::info::default_concurrency(custom::task_arena::constraints{}.set_core_type(type)) /
threadsPerStream);
sum += num_streams_for_core_type; sum += num_streams_for_core_type;
// prefix sum, so the core type for a given stream id will be deduced just as a upper_bound // prefix sum, so the core type for a given stream id will be deduced just as a upper_bound
// (notice that the map keeps the elements in the descending order, so the big cores are populated // (notice that the map keeps the elements in the descending order, so the big cores are populated

View File

@@ -46,6 +46,53 @@ int IStreamsExecutor::Config::GetDefaultNumStreams() {
return 1; return 1;
} }
int IStreamsExecutor::Config::GetHybridNumStreams(const Config& config, const int stream_mode) {
const int num_phy_cores = getNumberOfCPUCores();
const int num_big_cores = getNumberOfCPUCores(true);
const int num_small_cores = num_phy_cores - num_big_cores;
if (stream_mode == DEFAULT) {
// bare minimum of streams (that evenly divides available number of core)
if (0 == num_big_cores % 4) {
config._big_core_streams = std::max(4, num_big_cores / 4);
} else if (0 == num_big_cores % 5) {
config._big_core_streams = std::max(5, num_big_cores / 5);
} else if (0 == num_big_cores % 3) {
config._big_core_streams = std::max(3, num_big_cores / 3);
} else { // if user disables some cores say in BIOS, so we got weird #cores which is not easy to divide
config._big_core_streams = 1;
}
config._threads_per_stream_big = num_big_cores / config._big_core_streams;
config._threads_per_stream_small = config._threads_per_stream_big * 2;
if (num_small_cores == 0) {
config._big_core_streams = num_big_cores / config._threads_per_stream_big;
config._threads_per_stream_small = 0;
} else if (num_small_cores < config._threads_per_stream_small) {
config._small_core_streams = 1;
config._threads_per_stream_small = num_small_cores;
config._threads_per_stream_big = config._threads_per_stream_small / 2;
config._big_core_streams = num_big_cores / config._threads_per_stream_big;
} else {
config._small_core_streams = num_small_cores / config._threads_per_stream_small;
}
} else if (stream_mode == AGGRESSIVE) {
config._big_core_streams = num_big_cores;
config._small_core_streams = num_small_cores / 2;
config._threads_per_stream_big = num_big_cores / config._big_core_streams;
config._threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / config._small_core_streams;
} else if (stream_mode == LESSAGGRESSIVE) {
config._big_core_streams = num_big_cores / 2;
config._small_core_streams = num_small_cores / 4;
config._threads_per_stream_big = num_big_cores / config._big_core_streams;
config._threads_per_stream_small = num_small_cores == 0 ? 0 : num_small_cores / config._small_core_streams;
} else {
IE_THROW() << "Wrong stream mode to get num of streams: " << stream_mode;
}
config._small_core_offset = num_small_cores == 0 ? 0 : num_big_cores * 2;
return config._big_core_streams + config._small_core_streams;
}
void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::string& value) { void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::string& value) {
if (key == CONFIG_KEY(CPU_BIND_THREAD)) { if (key == CONFIG_KEY(CPU_BIND_THREAD)) {
if (value == CONFIG_VALUE(YES) || value == CONFIG_VALUE(NUMA)) { if (value == CONFIG_VALUE(YES) || value == CONFIG_VALUE(NUMA)) {

View File

@@ -46,14 +46,14 @@ bool PinCurrentThreadByMask(int ncores, const CpuSet& procMask) {
return 0 == sched_setaffinity(0, CPU_ALLOC_SIZE(ncores), procMask.get()); return 0 == sched_setaffinity(0, CPU_ALLOC_SIZE(ncores), procMask.get());
} }
bool PinThreadToVacantCore(int thrIdx, int hyperthreads, int ncores, const CpuSet& procMask) { bool PinThreadToVacantCore(int thrIdx, int hyperthreads, int ncores, const CpuSet& procMask, int cpuIdxOffset) {
if (procMask == nullptr) if (procMask == nullptr)
return false; return false;
const size_t size = CPU_ALLOC_SIZE(ncores); const size_t size = CPU_ALLOC_SIZE(ncores);
const int num_cpus = CPU_COUNT_S(size, procMask.get()); const int num_cpus = CPU_COUNT_S(size, procMask.get());
thrIdx %= num_cpus; // To limit unique number in [; num_cpus-1] range thrIdx %= num_cpus; // To limit unique number in [; num_cpus-1] range
// Place threads with specified step // Place threads with specified step
int cpu_idx = 0; int cpu_idx = cpuIdxOffset;
for (int i = 0, offset = 0; i < thrIdx; ++i) { for (int i = 0, offset = 0; i < thrIdx; ++i) {
cpu_idx += hyperthreads; cpu_idx += hyperthreads;
if (cpu_idx >= num_cpus) if (cpu_idx >= num_cpus)
@@ -61,8 +61,8 @@ bool PinThreadToVacantCore(int thrIdx, int hyperthreads, int ncores, const CpuSe
} }
// Find index of 'cpu_idx'-th bit that equals to 1 // Find index of 'cpu_idx'-th bit that equals to 1
int mapped_idx = -1; int mapped_idx = cpuIdxOffset - 1;
while (cpu_idx >= 0) { while (cpu_idx >= cpuIdxOffset) {
mapped_idx++; mapped_idx++;
if (CPU_ISSET_S(mapped_idx, size, procMask.get())) if (CPU_ISSET_S(mapped_idx, size, procMask.get()))
--cpu_idx; --cpu_idx;
@@ -104,7 +104,7 @@ std::tuple<CpuSet, int> GetProcessMask() {
} }
void ReleaseProcessMask(cpu_set_t*) {} void ReleaseProcessMask(cpu_set_t*) {}
bool PinThreadToVacantCore(int thrIdx, int hyperthreads, int ncores, const CpuSet& procMask) { bool PinThreadToVacantCore(int thrIdx, int hyperthreads, int ncores, const CpuSet& procMask, int cpuIdxOffset) {
return false; return false;
} }
bool PinCurrentThreadByMask(int ncores, const CpuSet& procMask) { bool PinCurrentThreadByMask(int ncores, const CpuSet& procMask) {

View File

@@ -64,7 +64,7 @@ std::tuple<CpuSet, int> GetProcessMask();
* @param[in] processMask The process mask * @param[in] processMask The process mask
* @return `True` in case of success, `false` otherwise * @return `True` in case of success, `false` otherwise
*/ */
bool PinThreadToVacantCore(int thrIdx, int hyperThreads, int ncores, const CpuSet& processMask); bool PinThreadToVacantCore(int thrIdx, int hyperThreads, int ncores, const CpuSet& processMask, int cpuIdxOffset = 0);
/** /**
* @brief Pins thread to a spare core in the round-robin scheme, while respecting the given process mask. * @brief Pins thread to a spare core in the round-robin scheme, while respecting the given process mask.

View File

@@ -730,20 +730,40 @@ void Engine::ApplyPerformanceHints(std::map<std::string, std::string> &config, c
// less aggressive // less aggressive
const auto num_streams_less_aggressive = num_cores / 2; const auto num_streams_less_aggressive = num_cores / 2;
// default #streams value (most conservative) // default #streams value (most conservative)
const auto default_num_streams = IStreamsExecutor::Config::GetDefaultNumStreams(); const auto default_num_streams =
engConfig.streamExecutorConfig._threadBindingType ==
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
? IStreamsExecutor::Config::GetHybridNumStreams(engConfig.streamExecutorConfig,
IStreamsExecutor::Config::StreamMode::DEFAULT)
: IStreamsExecutor::Config::GetDefaultNumStreams();
int num_streams = default_num_streams; int num_streams = default_num_streams;
if (networkToleranceForLowCache.max_mem_tolerance == ov::MemBandwidthPressure::UNKNOWN) { if (networkToleranceForLowCache.max_mem_tolerance == ov::MemBandwidthPressure::UNKNOWN) {
if ((networkToleranceForLowCache.ratio_compute_convs == ov::MemBandwidthPressure::ALL) if ((networkToleranceForLowCache.ratio_compute_convs == ov::MemBandwidthPressure::ALL)
|| (networkToleranceForLowCache.ratio_compute_deconvs == ov::MemBandwidthPressure::ALL)) { || (networkToleranceForLowCache.ratio_compute_deconvs == ov::MemBandwidthPressure::ALL)) {
// all relevant layers (convs, etc) are compute-limited, the most aggressive val for #streams // all relevant layers (convs, etc) are compute-limited, the most aggressive val for #streams
num_streams = num_cores; num_streams = engConfig.streamExecutorConfig._threadBindingType ==
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
? IStreamsExecutor::Config::GetHybridNumStreams(
engConfig.streamExecutorConfig,
IStreamsExecutor::Config::StreamMode::AGGRESSIVE)
: num_cores;
} // otherwise (no recognized layers) falling back to the default value } // otherwise (no recognized layers) falling back to the default value
} else if (networkToleranceForLowCache.max_mem_tolerance > memThresholdAssumeLimitedForISA) { } else if (networkToleranceForLowCache.max_mem_tolerance > memThresholdAssumeLimitedForISA) {
// network is below the ISA-specific threshold // network is below the ISA-specific threshold
num_streams = num_cores; num_streams = engConfig.streamExecutorConfig._threadBindingType ==
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
? IStreamsExecutor::Config::GetHybridNumStreams(
engConfig.streamExecutorConfig,
IStreamsExecutor::Config::StreamMode::AGGRESSIVE)
: num_cores;
} else if (networkToleranceForLowCache.max_mem_tolerance > ov::MemBandwidthPressure::LIMITED) { } else if (networkToleranceForLowCache.max_mem_tolerance > ov::MemBandwidthPressure::LIMITED) {
// network is below general threshold // network is below general threshold
num_streams = std::max(default_num_streams, num_streams_less_aggressive); num_streams = engConfig.streamExecutorConfig._threadBindingType ==
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
? IStreamsExecutor::Config::GetHybridNumStreams(
engConfig.streamExecutorConfig,
IStreamsExecutor::Config::StreamMode::LESSAGGRESSIVE)
: std::max(default_num_streams, num_streams_less_aggressive);
} }
auto num_requests = config.find(CONFIG_KEY(PERFORMANCE_HINT_NUM_REQUESTS)); auto num_requests = config.find(CONFIG_KEY(PERFORMANCE_HINT_NUM_REQUESTS));
if (num_requests != config.end()) { // arrived with config to the LoadNetwork (and thus higher pri) if (num_requests != config.end()) { // arrived with config to the LoadNetwork (and thus higher pri)