From 118dfdf0419cce65e7bc90ef69af16f0d928bd13 Mon Sep 17 00:00:00 2001 From: Markus Blatt Date: Wed, 19 Jul 2023 14:05:19 +0200 Subject: [PATCH] Move MPI process check to *-cpp file. --- opm/simulators/flow/FlowMainEbos.cpp | 78 ++++++++++++++++++++++++++ opm/simulators/flow/FlowMainEbos.hpp | 83 +--------------------------- 2 files changed, 81 insertions(+), 80 deletions(-) diff --git a/opm/simulators/flow/FlowMainEbos.cpp b/opm/simulators/flow/FlowMainEbos.cpp index 3d6b0d132..64838d819 100644 --- a/opm/simulators/flow/FlowMainEbos.cpp +++ b/opm/simulators/flow/FlowMainEbos.cpp @@ -77,6 +77,84 @@ void handleExtraConvergenceOutput(SimulatorReport& report, report.fullReports(os); } } +void checkAllMPIProcesses() +{ +#if HAVE_MPI + const auto& comm = EclGenericVanguard::comm(); + if (comm.size() > 1) + { + // we try to prevent the abort here. + // For that we need a signal that each process is here. + // Each process sends a message to rank 0. + const int tag = 357912; + if (comm.rank() == 0) + { + // wait for a message from all processes. + std::vector requests(comm.size() - 1, MPI_REQUEST_NULL); + std::vector data(comm.size()-1); + + for(decltype(comm.size()) i = 1; i < comm.size(); ++i) + { + if (auto error = MPI_Irecv(data.data() + i, 1, MPI_INT, i, tag, comm, requests.data() + i - 1); + error != MPI_SUCCESS) { + OpmLog::error(fmt::format("Error: Could not set up MPI receive (error code : {})", error)); + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + } + std::size_t msgs = comm.size() - 1; + for(std::size_t tries = 0; msgs >0 && tries < 3; ++tries) + { + sleep(3); + int flag, idx; + for(auto left_msgs = msgs; left_msgs > 0; --left_msgs) + { + if( auto error = MPI_Testany(comm.size()-1, requests.data(), &idx, &flag, MPI_STATUS_IGNORE); + error != MPI_SUCCESS) { + OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error)); + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + if (flag) + { + --msgs; + } + } + } + if (msgs) { + // seems like some processes are stuck. Abort just to be save + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + } + else + { + int data= 3; + MPI_Request request = MPI_REQUEST_NULL; + if (auto error = MPI_Isend(&data, 1, MPI_INT, 0, tag, comm, &request); + error != MPI_SUCCESS) { + OpmLog::error(fmt::format("Error: Could send MPI message (error code : {})", error)); + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + bool completed = false; + for(std::size_t tries = 0; !completed && tries < 3; tries++) + { + sleep(3); + int flag; + if( auto error = MPI_Test(&request, &flag, MPI_STATUS_IGNORE); + error != MPI_SUCCESS) { + OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error)); + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + if (flag) + { + completed = true; + } + } + if (!completed) { + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + } + } +#endif +} } // namespace detail } // namespace Opm diff --git a/opm/simulators/flow/FlowMainEbos.hpp b/opm/simulators/flow/FlowMainEbos.hpp index 1787d0203..4734e33b5 100644 --- a/opm/simulators/flow/FlowMainEbos.hpp +++ b/opm/simulators/flow/FlowMainEbos.hpp @@ -75,6 +75,8 @@ struct OutputInterval { namespace Opm { namespace detail { +void checkAllMPIProcesses(); + void mergeParallelLogFiles(std::string_view output_dir, std::string_view deck_filename, bool enableLoggingFalloutWarning); @@ -320,85 +322,6 @@ void handleExtraConvergenceOutput(SimulatorReport& report, return simtimer_.get(); } - static void checkAllMPIProcesses() - { -#if HAVE_MPI - const auto& comm = EclGenericVanguard::comm(); - if (comm.size() > 1) - { - // we try to prevent the abort here. - // For that we need a signal that each process is here. - // Each process sends a message to rank 0. - const int tag = 357912; - if (comm.rank() == 0) - { - // wait for a message from all processes. - std::vector requests(comm.size() - 1, MPI_REQUEST_NULL); - std::vector data(comm.size()-1); - - for(decltype(comm.size()) i = 1; i < comm.size(); ++i) - { - if (auto error = MPI_Irecv(data.data() + i, 1, MPI_INT, i, tag, comm, requests.data() + i - 1); - error != MPI_SUCCESS) { - OpmLog::error(fmt::format("Error: Could not set up MPI receive (error code : {})", error)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - } - std::size_t msgs = comm.size() - 1; - for(std::size_t tries = 0; msgs >0 && tries < 3; ++tries) - { - sleep(3); - int flag, idx; - for(auto left_msgs = msgs; left_msgs > 0; --left_msgs) - { - if( auto error = MPI_Testany(comm.size()-1, requests.data(), &idx, &flag, MPI_STATUS_IGNORE); - error != MPI_SUCCESS) { - OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - if (flag) - { - --msgs; - } - } - } - if (msgs) { - // seems like some processes are stuck. Abort just to be save - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - } - else - { - int data= 3; - MPI_Request request = MPI_REQUEST_NULL; - if (auto error = MPI_Isend(&data, 1, MPI_INT, 0, tag, comm, &request); - error != MPI_SUCCESS) { - OpmLog::error(fmt::format("Error: Could send MPI message (error code : {})", error)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - bool completed = false; - for(std::size_t tries = 0; !completed && tries < 3; tries++) - { - sleep(3); - int flag; - if( auto error = MPI_Test(&request, &flag, MPI_STATUS_IGNORE); - error != MPI_SUCCESS) { - OpmLog::error(fmt::format("Error: Could not test for MPI message (error code : {})", error)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - if (flag) - { - completed = true; - } - } - if (!completed) { - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - } - } -#endif - } - private: // called by execute() or executeInitStep() int execute_(int (FlowMainEbos::* runOrInitFunc)(), bool cleanup) @@ -417,7 +340,7 @@ void handleExtraConvergenceOutput(SimulatorReport& report, std::cout << message.str() << "\n"; } } - this->checkAllMPIProcesses(); + detail::checkAllMPIProcesses(); return EXIT_FAILURE; };