Move MPI process check to *-cpp file.

This commit is contained in:
Markus Blatt 2023-07-19 14:05:19 +02:00
parent 859e00254e
commit 118dfdf041
2 changed files with 81 additions and 80 deletions

View File

@ -77,6 +77,84 @@ void handleExtraConvergenceOutput(SimulatorReport& report,
report.fullReports(os);
}
}
void checkAllMPIProcesses()
{
#if HAVE_MPI
const auto& comm = EclGenericVanguard::comm();
if (comm.size() > 1)
{
// we try to prevent the abort here.
// For that we need a signal that each process is here.
// Each process sends a message to rank 0.
const int tag = 357912;
if (comm.rank() == 0)
{
// wait for a message from all processes.
std::vector<MPI_Request> requests(comm.size() - 1, MPI_REQUEST_NULL);
std::vector<int> data(comm.size()-1);
for(decltype(comm.size()) i = 1; i < comm.size(); ++i)
{
if (auto error = MPI_Irecv(data.data() + i, 1, MPI_INT, i, tag, comm, requests.data() + i - 1);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could not set up MPI receive (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
}
std::size_t msgs = comm.size() - 1;
for(std::size_t tries = 0; msgs >0 && tries < 3; ++tries)
{
sleep(3);
int flag, idx;
for(auto left_msgs = msgs; left_msgs > 0; --left_msgs)
{
if( auto error = MPI_Testany(comm.size()-1, requests.data(), &idx, &flag, MPI_STATUS_IGNORE);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (flag)
{
--msgs;
}
}
}
if (msgs) {
// seems like some processes are stuck. Abort just to be save
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
}
else
{
int data= 3;
MPI_Request request = MPI_REQUEST_NULL;
if (auto error = MPI_Isend(&data, 1, MPI_INT, 0, tag, comm, &request);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could send MPI message (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
bool completed = false;
for(std::size_t tries = 0; !completed && tries < 3; tries++)
{
sleep(3);
int flag;
if( auto error = MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (flag)
{
completed = true;
}
}
if (!completed) {
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
}
}
#endif
}
} // namespace detail
} // namespace Opm

View File

@ -75,6 +75,8 @@ struct OutputInterval<TypeTag, TTag::EclFlowProblem> {
namespace Opm {
namespace detail {
void checkAllMPIProcesses();
void mergeParallelLogFiles(std::string_view output_dir,
std::string_view deck_filename,
bool enableLoggingFalloutWarning);
@ -320,85 +322,6 @@ void handleExtraConvergenceOutput(SimulatorReport& report,
return simtimer_.get();
}
static void checkAllMPIProcesses()
{
#if HAVE_MPI
const auto& comm = EclGenericVanguard::comm();
if (comm.size() > 1)
{
// we try to prevent the abort here.
// For that we need a signal that each process is here.
// Each process sends a message to rank 0.
const int tag = 357912;
if (comm.rank() == 0)
{
// wait for a message from all processes.
std::vector<MPI_Request> requests(comm.size() - 1, MPI_REQUEST_NULL);
std::vector<int> data(comm.size()-1);
for(decltype(comm.size()) i = 1; i < comm.size(); ++i)
{
if (auto error = MPI_Irecv(data.data() + i, 1, MPI_INT, i, tag, comm, requests.data() + i - 1);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could not set up MPI receive (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
}
std::size_t msgs = comm.size() - 1;
for(std::size_t tries = 0; msgs >0 && tries < 3; ++tries)
{
sleep(3);
int flag, idx;
for(auto left_msgs = msgs; left_msgs > 0; --left_msgs)
{
if( auto error = MPI_Testany(comm.size()-1, requests.data(), &idx, &flag, MPI_STATUS_IGNORE);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (flag)
{
--msgs;
}
}
}
if (msgs) {
// seems like some processes are stuck. Abort just to be save
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
}
else
{
int data= 3;
MPI_Request request = MPI_REQUEST_NULL;
if (auto error = MPI_Isend(&data, 1, MPI_INT, 0, tag, comm, &request);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could send MPI message (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
bool completed = false;
for(std::size_t tries = 0; !completed && tries < 3; tries++)
{
sleep(3);
int flag;
if( auto error = MPI_Test(&request, &flag, MPI_STATUS_IGNORE);
error != MPI_SUCCESS) {
OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error));
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
if (flag)
{
completed = true;
}
}
if (!completed) {
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
}
}
#endif
}
private:
// called by execute() or executeInitStep()
int execute_(int (FlowMainEbos::* runOrInitFunc)(), bool cleanup)
@ -417,7 +340,7 @@ void handleExtraConvergenceOutput(SimulatorReport& report,
std::cout << message.str() << "\n";
}
}
this->checkAllMPIProcesses();
detail::checkAllMPIProcesses();
return EXIT_FAILURE;
};