set cpu affinity to numa in MULTI situation (#13407)
* change gpunum to 3 * hold threads for GPU for MULTI:GPU,CPU * need to first check if there is a CPU in the device list * use getNumberOfCPUCores to get CPU cores * load GPU first * assign the correct value to multiSContext->_devicePriorities * load GPU first and load CPU last and set numa for CPU * MULTI set CPU affinity to “NUMA” during load network * Load the CPU last while maintaining the original device priority * not using vector for CPU * There is no user setting affinity in MULTI, and NUMA is set for the CPU * pass key ENABLE_HYPER_THREAD to CPU plugin and merge xiaoxia PR * set ENABLE_HYPER_THREAD to NO * modify log * Modify the code according to xiaoxia and wanglei comments * Modify the code according to bell comments
This commit is contained in:
parent
384a961793
commit
42b816ace7
@ -104,6 +104,10 @@ DECLARE_CONFIG_KEY(FORCE_DISABLE_CACHE);
|
|||||||
*/
|
*/
|
||||||
DECLARE_CONFIG_KEY(CONFIG_DEVICE_ID);
|
DECLARE_CONFIG_KEY(CONFIG_DEVICE_ID);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief enable hyper thread
|
||||||
|
*/
|
||||||
|
DECLARE_CONFIG_KEY(ENABLE_HYPER_THREAD);
|
||||||
} // namespace PluginConfigInternalParams
|
} // namespace PluginConfigInternalParams
|
||||||
|
|
||||||
} // namespace InferenceEngine
|
} // namespace InferenceEngine
|
||||||
|
@ -82,7 +82,8 @@ public:
|
|||||||
* @return configured values
|
* @return configured values
|
||||||
*/
|
*/
|
||||||
static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true);
|
static Config MakeDefaultMultiThreaded(const Config& initial, const bool fp_intesive = true);
|
||||||
static int GetDefaultNumStreams(); // no network specifics considered (only CPU's caps);
|
static int GetDefaultNumStreams(
|
||||||
|
const bool enable_hyper_thread = true); // no network specifics considered (only CPU's caps);
|
||||||
static int GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode);
|
static int GetHybridNumStreams(std::map<std::string, std::string>& config, const int stream_mode);
|
||||||
static void UpdateHybridCustomThreads(Config& config);
|
static void UpdateHybridCustomThreads(Config& config);
|
||||||
|
|
||||||
@ -102,6 +103,7 @@ public:
|
|||||||
int _threads_per_stream_big = 0; //!< Threads per stream in big cores
|
int _threads_per_stream_big = 0; //!< Threads per stream in big cores
|
||||||
int _threads_per_stream_small = 0; //!< Threads per stream in small cores
|
int _threads_per_stream_small = 0; //!< Threads per stream in small cores
|
||||||
int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores
|
int _small_core_offset = 0; //!< Calculate small core start offset when binding cpu cores
|
||||||
|
bool _enable_hyper_thread = true; //!< enable hyper thread
|
||||||
enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE };
|
enum StreamMode { DEFAULT, AGGRESSIVE, LESSAGGRESSIVE };
|
||||||
enum PreferredCoreType {
|
enum PreferredCoreType {
|
||||||
ANY,
|
ANY,
|
||||||
|
@ -33,15 +33,17 @@ std::vector<std::string> IStreamsExecutor::Config::SupportedKeys() const {
|
|||||||
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG),
|
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_BIG),
|
||||||
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL),
|
CONFIG_KEY_INTERNAL(THREADS_PER_STREAM_SMALL),
|
||||||
CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET),
|
CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET),
|
||||||
|
CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD),
|
||||||
ov::num_streams.name(),
|
ov::num_streams.name(),
|
||||||
ov::inference_num_threads.name(),
|
ov::inference_num_threads.name(),
|
||||||
ov::affinity.name(),
|
ov::affinity.name(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
int IStreamsExecutor::Config::GetDefaultNumStreams() {
|
int IStreamsExecutor::Config::GetDefaultNumStreams(const bool enable_hyper_thread) {
|
||||||
const int sockets = static_cast<int>(getAvailableNUMANodes().size());
|
const int sockets = static_cast<int>(getAvailableNUMANodes().size());
|
||||||
// bare minimum of streams (that evenly divides available number of core)
|
// bare minimum of streams (that evenly divides available number of core)
|
||||||
const int num_cores = sockets == 1 ? parallel_get_max_threads() : getNumberOfCPUCores();
|
const int num_cores = sockets == 1 ? (enable_hyper_thread ? parallel_get_max_threads() : getNumberOfCPUCores())
|
||||||
|
: getNumberOfCPUCores();
|
||||||
if (0 == num_cores % 4)
|
if (0 == num_cores % 4)
|
||||||
return std::max(4, num_cores / 4);
|
return std::max(4, num_cores / 4);
|
||||||
else if (0 == num_cores % 5)
|
else if (0 == num_cores % 5)
|
||||||
@ -280,6 +282,14 @@ void IStreamsExecutor::Config::SetConfig(const std::string& key, const std::stri
|
|||||||
<< ". Expected only non negative numbers";
|
<< ". Expected only non negative numbers";
|
||||||
}
|
}
|
||||||
_small_core_offset = val_i;
|
_small_core_offset = val_i;
|
||||||
|
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
|
||||||
|
if (value == CONFIG_VALUE(YES)) {
|
||||||
|
_enable_hyper_thread = true;
|
||||||
|
} else if (value == CONFIG_VALUE(NO)) {
|
||||||
|
_enable_hyper_thread = false;
|
||||||
|
} else {
|
||||||
|
OPENVINO_UNREACHABLE("Unsupported enable hyper thread type");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
IE_THROW() << "Wrong value for property key " << key;
|
IE_THROW() << "Wrong value for property key " << key;
|
||||||
}
|
}
|
||||||
@ -328,6 +338,8 @@ Parameter IStreamsExecutor::Config::GetConfig(const std::string& key) const {
|
|||||||
return {std::to_string(_threads_per_stream_small)};
|
return {std::to_string(_threads_per_stream_small)};
|
||||||
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
|
} else if (key == CONFIG_KEY_INTERNAL(SMALL_CORE_OFFSET)) {
|
||||||
return {std::to_string(_small_core_offset)};
|
return {std::to_string(_small_core_offset)};
|
||||||
|
} else if (key == CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD)) {
|
||||||
|
return {_enable_hyper_thread ? CONFIG_VALUE(YES) : CONFIG_VALUE(NO)};
|
||||||
} else {
|
} else {
|
||||||
IE_THROW() << "Wrong value for property key " << key;
|
IE_THROW() << "Wrong value for property key " << key;
|
||||||
}
|
}
|
||||||
@ -445,18 +457,19 @@ IStreamsExecutor::Config IStreamsExecutor::Config::MakeDefaultMultiThreaded(cons
|
|||||||
<< streamExecutorConfig._threads_per_stream_small << ")";
|
<< streamExecutorConfig._threads_per_stream_small << ")";
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
const auto hwCores = !bLatencyCase && numaNodesNum == 1
|
const auto hwCores =
|
||||||
// throughput case on a single-NUMA node machine uses all available cores
|
!bLatencyCase && numaNodesNum == 1
|
||||||
? parallel_get_max_threads()
|
// throughput case on a single-NUMA node machine uses all available cores
|
||||||
// in the rest of cases:
|
? (streamExecutorConfig._enable_hyper_thread ? parallel_get_max_threads() : num_cores_default)
|
||||||
// multi-node machine
|
// in the rest of cases:
|
||||||
// or
|
// multi-node machine
|
||||||
// latency case, single-node yet hybrid case that uses
|
// or
|
||||||
// all core types
|
// latency case, single-node yet hybrid case that uses
|
||||||
// or
|
// all core types
|
||||||
// big-cores only, but the #cores is "enough" (pls see the logic above)
|
// or
|
||||||
// it is usually beneficial not to use the hyper-threading (which is default)
|
// big-cores only, but the #cores is "enough" (pls see the logic above)
|
||||||
: num_cores_default;
|
// it is usually beneficial not to use the hyper-threading (which is default)
|
||||||
|
: num_cores_default;
|
||||||
const auto threads =
|
const auto threads =
|
||||||
streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores);
|
streamExecutorConfig._threads ? streamExecutorConfig._threads : (envThreads ? envThreads : hwCores);
|
||||||
streamExecutorConfig._threadsPerStream =
|
streamExecutorConfig._threadsPerStream =
|
||||||
|
@ -122,37 +122,8 @@ void AutoSchedule::init(const ScheduleContext::Ptr& sContext) {
|
|||||||
_loadContext[ACTUALDEVICE].metaDevices = _autoSContext->_devicePriorities;
|
_loadContext[ACTUALDEVICE].metaDevices = _autoSContext->_devicePriorities;
|
||||||
if (isCumulative) {
|
if (isCumulative) {
|
||||||
std::list<DeviceInformation> validDevices =
|
std::list<DeviceInformation> validDevices =
|
||||||
_autoSContext->_plugin->GetValidDevice(_autoSContext->_devicePriorities, _loadContext[ACTUALDEVICE].networkPrecision);
|
_autoSContext->_plugin->GetValidDevice(_autoSContext->_devicePriorities,
|
||||||
|
_loadContext[ACTUALDEVICE].networkPrecision);
|
||||||
// check if device priority is enabled
|
|
||||||
bool enableDevicePriority =
|
|
||||||
std::find_if(std::begin(validDevices), std::end(validDevices), [](DeviceInformation& di) {
|
|
||||||
return di.devicePriority > 0;
|
|
||||||
}) != std::end(validDevices);
|
|
||||||
|
|
||||||
// for the case of -d "AUTO" or "AUTO: -xxx"
|
|
||||||
if (!enableDevicePriority) {
|
|
||||||
std::list<DeviceInformation>::iterator itCPUDevice;
|
|
||||||
int GPUNums = 0, CPUNums = 0;
|
|
||||||
for (auto it = validDevices.begin(); it != validDevices.end(); it++) {
|
|
||||||
if (it->deviceName.find("GPU") != std::string::npos) {
|
|
||||||
GPUNums++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (it->deviceName.find("CPU") == 0) {
|
|
||||||
CPUNums++;
|
|
||||||
itCPUDevice = it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove CPU from default candidate list for Cumulative Throughput mode
|
|
||||||
if (GPUNums >= 3 && CPUNums > 0 && !_autoSContext->_bindBuffer) {
|
|
||||||
validDevices.erase(itCPUDevice);
|
|
||||||
LOG_INFO_TAG("GPUNums:%d, remove CPU from default candidate list for "
|
|
||||||
"CUMULATIVE_THROUGHPUT",
|
|
||||||
GPUNums);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string deviceName = "MULTI:";
|
std::string deviceName = "MULTI:";
|
||||||
for (auto& device : validDevices) {
|
for (auto& device : validDevices) {
|
||||||
|
@ -455,40 +455,100 @@ IExecutableNetworkInternal::Ptr MultiDeviceInferencePlugin::LoadNetworkImpl(cons
|
|||||||
std::mutex load_mutex;
|
std::mutex load_mutex;
|
||||||
std::vector<Task> loads;
|
std::vector<Task> loads;
|
||||||
std::once_flag readNetworkFlag;
|
std::once_flag readNetworkFlag;
|
||||||
|
|
||||||
|
auto loadInferEngTask = [&](DeviceInformation& p) {
|
||||||
|
auto tmpiter = fullConfig.find(CONFIG_KEY(ALLOW_AUTO_BATCHING));
|
||||||
|
if (tmpiter != fullConfig.end()) {
|
||||||
|
if (tmpiter->second == PluginConfigParams::NO) {
|
||||||
|
LOG_INFO_TAG("set %s=%s", tmpiter->first.c_str(), tmpiter->second.c_str());
|
||||||
|
multiSContext->_batchingDisabled = true;
|
||||||
|
}
|
||||||
|
p.config.insert({tmpiter->first, tmpiter->second});
|
||||||
|
}
|
||||||
|
insertPropToConfig(CONFIG_KEY(AUTO_BATCH_TIMEOUT), p.deviceName, p.config);
|
||||||
|
insertPropToConfig(CONFIG_KEY(CACHE_DIR), p.deviceName, p.config);
|
||||||
|
const auto& deviceName = p.deviceName;
|
||||||
|
const auto& deviceConfig = p.config;
|
||||||
|
SoExecutableNetworkInternal exec_net;
|
||||||
|
LOG_DEBUG_TAG("load network to device:%s", deviceName.c_str());
|
||||||
|
if (modelPath.empty()) {
|
||||||
|
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
|
||||||
|
} else if (GetCore()->DeviceSupportsImportExport(deviceName)) {
|
||||||
|
exec_net = GetCore()->LoadNetwork(modelPath, deviceName, deviceConfig);
|
||||||
|
} else {
|
||||||
|
std::call_once(readNetworkFlag, [&]() {
|
||||||
|
network = GetCore()->ReadNetwork(modelPath, std::string());
|
||||||
|
});
|
||||||
|
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
std::string sStreamNums = "";
|
||||||
|
std::string sThreadNums = "";
|
||||||
|
if (deviceName.find("CPU") != std::string::npos) {
|
||||||
|
sStreamNums = exec_net->GetMetric(ov::num_streams.name()).as<std::string>();
|
||||||
|
sThreadNums = exec_net->GetMetric(ov::inference_num_threads.name()).as<std::string>();
|
||||||
|
} else if (deviceName.find("GPU") != std::string::npos) {
|
||||||
|
sStreamNums = exec_net->GetConfig(PluginConfigParams::KEY_GPU_THROUGHPUT_STREAMS).as<std::string>();
|
||||||
|
sThreadNums = exec_net->GetConfig(GPUConfigParams::KEY_GPU_MAX_NUM_THREADS).as<std::string>();
|
||||||
|
}
|
||||||
|
|
||||||
|
// print CPU or GPU streams num and threads num
|
||||||
|
if (!sStreamNums.empty() && !sThreadNums.empty()) {
|
||||||
|
LOG_INFO_TAG("after load network, %s streamNums:%s, %s threadNums:%s",
|
||||||
|
deviceName.c_str(),
|
||||||
|
sStreamNums.c_str(),
|
||||||
|
deviceName.c_str(),
|
||||||
|
sThreadNums.c_str());
|
||||||
|
}
|
||||||
|
} catch (...) {
|
||||||
|
LOG_DEBUG_TAG("deviceName:%s cannot get streamNums and threadNums from exec_net", deviceName.c_str());
|
||||||
|
}
|
||||||
|
std::unique_lock<std::mutex> lock{load_mutex};
|
||||||
|
executableNetworkPerDevice.insert({deviceName, exec_net});
|
||||||
|
multiNetworkConfig.insert(deviceConfig.begin(), deviceConfig.end());
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if CPU is in device list
|
||||||
|
auto iterCPU = std::find_if(metaDevices.begin(), metaDevices.end(), [&](DeviceInformation& d) {
|
||||||
|
return d.deviceName.find("CPU") != std::string::npos;
|
||||||
|
});
|
||||||
|
// Load devices other than CPU first
|
||||||
for (auto& p : metaDevices) {
|
for (auto& p : metaDevices) {
|
||||||
|
if (iterCPU != metaDevices.end() && p.deviceName == iterCPU->deviceName) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
loads.push_back([&]() {
|
loads.push_back([&]() {
|
||||||
auto tmpiter = fullConfig.find(CONFIG_KEY(ALLOW_AUTO_BATCHING));
|
loadInferEngTask(p);
|
||||||
if (tmpiter != fullConfig.end()) {
|
|
||||||
if (tmpiter->second == PluginConfigParams::NO)
|
|
||||||
multiSContext->_batchingDisabled = true;
|
|
||||||
p.config.insert({tmpiter->first, tmpiter->second});
|
|
||||||
}
|
|
||||||
insertPropToConfig(CONFIG_KEY(AUTO_BATCH_TIMEOUT), p.deviceName, p.config);
|
|
||||||
insertPropToConfig(CONFIG_KEY(CACHE_DIR), p.deviceName, p.config);
|
|
||||||
const auto& deviceName = p.deviceName;
|
|
||||||
const auto& deviceConfig = p.config;
|
|
||||||
SoExecutableNetworkInternal exec_net;
|
|
||||||
if (modelPath.empty()) {
|
|
||||||
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
|
|
||||||
} else if (GetCore()->DeviceSupportsImportExport(deviceName)) {
|
|
||||||
exec_net = GetCore()->LoadNetwork(modelPath, deviceName, deviceConfig);
|
|
||||||
} else {
|
|
||||||
std::call_once(readNetworkFlag, [&]() {
|
|
||||||
network = GetCore()->ReadNetwork(modelPath, std::string());
|
|
||||||
});
|
|
||||||
exec_net = GetCore()->LoadNetwork(network, deviceName, deviceConfig);
|
|
||||||
}
|
|
||||||
std::unique_lock<std::mutex> lock{load_mutex};
|
|
||||||
executableNetworkPerDevice.insert({deviceName, exec_net});
|
|
||||||
multiNetworkConfig.insert(deviceConfig.begin(), deviceConfig.end());
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
auto executor = executorManager()->getIdleCPUStreamsExecutor(
|
auto executor = executorManager()->getIdleCPUStreamsExecutor(
|
||||||
IStreamsExecutor::Config{"MultiDeviceAsyncLoad",
|
IStreamsExecutor::Config{"MultiDeviceAsyncLoad",
|
||||||
static_cast<int>(std::thread::hardware_concurrency()) /* max possible #streams*/,
|
static_cast<int>(std::thread::hardware_concurrency()) /* max possible #streams*/,
|
||||||
0 /*default threads per stream, workaround for ticket 62376*/,
|
0 /*default threads per stream, workaround for ticket 62376*/,
|
||||||
IStreamsExecutor::ThreadBindingType::NONE});
|
IStreamsExecutor::ThreadBindingType::NONE});
|
||||||
executor->runAndWait(loads);
|
if (loads.size() > 0) {
|
||||||
|
// Wait for the device to load the network
|
||||||
|
executor->runAndWait(loads);
|
||||||
|
loads.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally load the CPU
|
||||||
|
if (iterCPU != metaDevices.end()) {
|
||||||
|
if (!executableNetworkPerDevice.empty() && iterCPU->config.find(ov::affinity.name()) == iterCPU->config.end()) {
|
||||||
|
LOG_DEBUG_TAG("set affinity to NUMA and disable hyper thread for CPU");
|
||||||
|
// If the other devices load successfully and no user set affinity then set NUMA to CPU
|
||||||
|
iterCPU->config.insert({ov::affinity.name(), ov::affinity(ov::Affinity::NUMA).second.as<std::string>()});
|
||||||
|
iterCPU->config.insert({CONFIG_KEY_INTERNAL(ENABLE_HYPER_THREAD), CONFIG_VALUE(NO)});
|
||||||
|
}
|
||||||
|
loads.push_back([&]() {
|
||||||
|
loadInferEngTask(*iterCPU);
|
||||||
|
});
|
||||||
|
// Wait for CPU to load the network
|
||||||
|
executor->runAndWait(loads);
|
||||||
|
}
|
||||||
|
|
||||||
if (executableNetworkPerDevice.empty())
|
if (executableNetworkPerDevice.empty())
|
||||||
IE_THROW(NotFound) << "Failed to load network to any device "
|
IE_THROW(NotFound) << "Failed to load network to any device "
|
||||||
<< "that the " << GetName() << " device is initialized to work with";
|
<< "that the " << GetName() << " device is initialized to work with";
|
||||||
|
@ -729,7 +729,7 @@ void Engine::ApplyPerformanceHints(std::map<std::string, std::string> &config, c
|
|||||||
engConfig.streamExecutorConfig._threadBindingType ==
|
engConfig.streamExecutorConfig._threadBindingType ==
|
||||||
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
|
InferenceEngine::IStreamsExecutor::ThreadBindingType::HYBRID_AWARE
|
||||||
? IStreamsExecutor::Config::GetHybridNumStreams(config, IStreamsExecutor::Config::StreamMode::DEFAULT)
|
? IStreamsExecutor::Config::GetHybridNumStreams(config, IStreamsExecutor::Config::StreamMode::DEFAULT)
|
||||||
: IStreamsExecutor::Config::GetDefaultNumStreams();
|
: IStreamsExecutor::Config::GetDefaultNumStreams(engConfig.streamExecutorConfig._enable_hyper_thread);
|
||||||
int num_streams = default_num_streams;
|
int num_streams = default_num_streams;
|
||||||
if (networkToleranceForLowCache.max_mem_tolerance == ov::MemBandwidthPressure::UNKNOWN) {
|
if (networkToleranceForLowCache.max_mem_tolerance == ov::MemBandwidthPressure::UNKNOWN) {
|
||||||
if ((networkToleranceForLowCache.ratio_compute_convs == ov::MemBandwidthPressure::ALL)
|
if ((networkToleranceForLowCache.ratio_compute_convs == ov::MemBandwidthPressure::ALL)
|
||||||
|
Loading…
Reference in New Issue
Block a user