Spawn slaves from master

This commit is contained in:
Håkon Hægland 2024-06-21 08:55:00 +02:00
parent 2bbf48c39c
commit 4fd225c004
14 changed files with 353 additions and 19 deletions

View File

@ -112,6 +112,8 @@ list (APPEND MAIN_SOURCE_FILES
opm/simulators/flow/RegionPhasePVAverage.cpp
opm/simulators/flow/SimulatorConvergenceOutput.cpp
opm/simulators/flow/SimulatorFullyImplicitBlackoil.cpp
opm/simulators/flow/ReservoirCouplingMaster.cpp
opm/simulators/flow/ReservoirCouplingSlave.cpp
opm/simulators/flow/SimulatorReportBanners.cpp
opm/simulators/flow/SimulatorSerializer.cpp
opm/simulators/flow/SolutionContainers.cpp

View File

@ -177,7 +177,7 @@ void FlowGenericVanguard::readDeck(const std::string& filename)
modelParams_.actionState_,
modelParams_.wtestState_,
modelParams_.eclSummaryConfig_,
nullptr, "normal", "normal", "100", false, false, false, {});
nullptr, "normal", "normal", "100", false, false, false, {}, /*slaveMode=*/false);
modelParams_.setupTime_ = setupTimer.stop();
}

View File

@ -50,8 +50,8 @@ namespace Opm::Parameters {
// Do not merge parallel output files or warn about them
struct EnableLoggingFalloutWarning { static constexpr bool value = false; };
struct OutputInterval { static constexpr int value = 1; };
struct Slave { static constexpr bool value = false; };
} // namespace Opm::Parameters
@ -100,6 +100,10 @@ namespace Opm {
Parameters::Register<Parameters::EnableLoggingFalloutWarning>
("Developer option to see whether logging was on non-root processors. "
"In that case it will be appended to the *.DBG or *.PRT files");
Parameters::Register<Parameters::Slave>
("Specify if the simulation is a slave simulation in a master-slave simulation");
Parameters::Hide<Parameters::Slave>();
Simulator::registerParameters();
// register the base parameters
registerAllParameters_<TypeTag>(/*finalizeRegistration=*/false);
@ -366,7 +370,7 @@ namespace Opm {
// Callback that will be called from runSimulatorInitOrRun_().
int runSimulatorRunCallback_()
{
SimulatorReport report = simulator_->run(*simtimer_);
SimulatorReport report = simulator_->run(*simtimer_, this->argc_, this->argv_);
runSimulatorAfterSim_(report);
return report.success.exit_status;
}
@ -374,7 +378,7 @@ namespace Opm {
// Callback that will be called from runSimulatorInitOrRun_().
int runSimulatorInitCallback_()
{
simulator_->init(*simtimer_);
simulator_->init(*simtimer_, this->argc_, this->argv_);
return EXIT_SUCCESS;
}

View File

@ -230,6 +230,7 @@ void Main::readDeck(const std::string& deckFilename,
const bool keepKeywords,
const std::size_t numThreads,
const int output_param,
const bool slaveMode,
const std::string& parameters,
std::string_view moduleVersion,
std::string_view compileTimestamp)
@ -265,7 +266,8 @@ void Main::readDeck(const std::string& deckFilename,
init_from_restart_file,
outputCout_,
keepKeywords,
outputInterval);
outputInterval,
slaveMode);
verifyValidCellGeometry(FlowGenericVanguard::comm(), *this->eclipseState_);

View File

@ -413,6 +413,7 @@ protected:
keepKeywords,
getNumThreads(),
Parameters::Get<Parameters::EclOutputInterval>(),
Parameters::Get<Parameters::Slave>(),
cmdline_params,
Opm::moduleVersion(),
Opm::compileTimestamp());
@ -697,6 +698,7 @@ private:
const bool keepKeywords,
const std::size_t numThreads,
const int output_param,
const bool slaveMode,
const std::string& parameters,
std::string_view moduleVersion,
std::string_view compileTimestamp);

View File

@ -0,0 +1,97 @@
/*
Copyright 2024 Equinor AS
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#include <opm/simulators/flow/ReservoirCouplingMaster.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/ReservoirCouplingInfo.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/MasterGroup.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/Slaves.hpp>
#include <opm/common/ErrorMacros.hpp>
#include <opm/simulators/utils/ParallelCommunication.hpp>
#include <opm/input/eclipse/Schedule/Schedule.hpp>
#include <filesystem>
#include <vector>
#include <fmt/format.h>
namespace Opm {
ReservoirCouplingMaster::ReservoirCouplingMaster(
const Parallel::Communication &comm,
const Schedule &schedule
) :
comm_{comm},
schedule_{schedule}
{ }
// NOTE: This functions is executed for all ranks, but only rank 0 will spawn
// the slave processes
void ReservoirCouplingMaster::spawnSlaveProcesses([[maybe_unused]]int argc, char **argv) {
const auto& rescoup = this->schedule_[0].rescoup();
char *flow_program_name = argv[0];
for (const auto& [slave_name, slave] : rescoup.slaves()) {
auto master_slave_comm = MPI_Comm_Ptr(new MPI_Comm(MPI_COMM_NULL));
// TODO: Here we should extract the relevant options from the master argv
// and forward them to the slave process, for now we just pass the deck file name
const auto& data_file_name = slave.dataFilename();
const auto& directory_path = slave.directoryPath();
// Concatenate the directory path and the data file name to get the full path
std::filesystem::path dir_path(directory_path);
std::filesystem::path data_file(data_file_name);
std::filesystem::path full_path = dir_path / data_file;
std::vector<char*> slave_argv(3);
slave_argv[0] = flow_program_name;
slave_argv[1] = const_cast<char*>(full_path.c_str());
slave_argv[2] = nullptr;
auto num_procs = slave.numprocs();
std::vector<int> errcodes(num_procs);
MPI_Info info;
MPI_Info_create(&info);
const std::string log_file = fmt::format("{}.log", slave_name);
// TODO: We need to decide how to handle the output from the slave processes
// For now we just redirect the output to a log file with the name of the slave
MPI_Info_set(info, "output", log_file.c_str());
MPI_Info_set(info, "error", log_file.c_str());
int spawn_result = MPI_Comm_spawn(
slave_argv[0],
slave_argv.data(),
/*maxprocs=*/num_procs,
/*info=*/info,
/*root=*/0, // Rank 0 spawns the slave processes
/*comm=*/this->comm_,
/*intercomm=*/master_slave_comm.get(),
/*array_of_errcodes=*/errcodes.data()
);
MPI_Info_free(&info);
if (spawn_result != MPI_SUCCESS || (*master_slave_comm == MPI_COMM_NULL)) {
OPM_THROW(std::runtime_error, "Failed to spawn slave process");
}
this->master_slave_comm_.push_back(std::move(master_slave_comm));
this->slave_names_.push_back(slave_name);
}
}
} // namespace Opm

View File

@ -0,0 +1,61 @@
/*
Copyright 2024 Equinor AS
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef OPM_RESERVOIR_COUPLING_MASTER_HPP
#define OPM_RESERVOIR_COUPLING_MASTER_HPP
#include <opm/simulators/utils/ParallelCommunication.hpp>
#include <opm/input/eclipse/Schedule/Schedule.hpp>
#include <opm/common/OpmLog/OpmLog.hpp>
#include <mpi.h>
#include <vector>
namespace Opm {
class ReservoirCouplingMaster {
public:
ReservoirCouplingMaster(const Parallel::Communication &comm, const Schedule &schedule);
// Custom deleter for MPI_Comm
struct MPI_Comm_Deleter {
void operator()(MPI_Comm* comm) const {
if (*comm != MPI_COMM_NULL) {
MPI_Comm_free(comm);
}
delete comm;
}
};
using MPI_Comm_Ptr = std::unique_ptr<MPI_Comm, MPI_Comm_Deleter>;
void spawnSlaveProcesses(int argc, char **argv);
private:
const Parallel::Communication &comm_;
const Schedule& schedule_;
// MPI communicators for the slave processes
std::vector<MPI_Comm_Ptr> master_slave_comm_;
std::vector<std::string> slave_names_;
};
} // namespace Opm
#endif // OPM_RESERVOIR_COUPLING_MASTER_HPP

View File

@ -0,0 +1,52 @@
/*
Copyright 2024 Equinor AS
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#include <opm/simulators/flow/ReservoirCouplingSlave.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/ReservoirCouplingInfo.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/MasterGroup.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/Slaves.hpp>
#include <opm/common/ErrorMacros.hpp>
#include <opm/simulators/utils/ParallelCommunication.hpp>
#include <vector>
namespace Opm {
ReservoirCouplingSlave::ReservoirCouplingSlave(
const Parallel::Communication &comm,
const Schedule &schedule
) :
comm_{comm},
schedule_{schedule}
{ }
void ReservoirCouplingSlave::sendSimulationStartDateToMasterProcess() {
// TODO: Implement this function next
//this->slave_master_comm_ = MPI_Comm_Ptr(new MPI_Comm(MPI_COMM_NULL));
//MPI_Comm_get_parent(this->slave_master_comm_.get());
//if (*(this->slave_master_comm_) == MPI_COMM_NULL) {
// OPM_THROW(std::runtime_error, "Slave process is not spawned by a master process");
//}
OpmLog::info("Sent simulation start date to master process");
}
} // namespace Opm

View File

@ -0,0 +1,50 @@
/*
Copyright 2024 Equinor AS
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef OPM_RESERVOIR_COUPLING_SLAVE_HPP
#define OPM_RESERVOIR_COUPLING_SLAVE_HPP
#include <opm/simulators/flow/ReservoirCouplingMaster.hpp>
#include <opm/input/eclipse/Schedule/Schedule.hpp>
#include <opm/simulators/utils/ParallelCommunication.hpp>
#include <opm/common/OpmLog/OpmLog.hpp>
#include <mpi.h>
#include <vector>
namespace Opm {
class ReservoirCouplingSlave {
public:
using MPI_Comm_Ptr = ReservoirCouplingMaster::MPI_Comm_Ptr;
ReservoirCouplingSlave(const Parallel::Communication &comm, const Schedule &schedule);
void sendSimulationStartDateToMasterProcess();
private:
const Parallel::Communication &comm_;
const Schedule& schedule_;
// MPI parent communicator for a slave process
MPI_Comm_Ptr slave_master_comm_{nullptr};
};
} // namespace Opm
#endif // OPM_RESERVOIR_COUPLING_SLAVE_HPP

View File

@ -58,6 +58,10 @@ void registerSimulatorParameters()
("FileName for .OPMRST file used to load serialized state. "
"If empty, CASENAME.OPMRST is used.");
Parameters::Hide<Parameters::LoadFile>();
Parameters::Register<Parameters::Slave>
("Specify if the simulation is a slave simulation in a master-slave simulation");
Parameters::Hide<Parameters::Slave>();
}
} // namespace Opm::detail

View File

@ -24,6 +24,9 @@
#include <opm/common/ErrorMacros.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/ReservoirCouplingInfo.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/MasterGroup.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/Slaves.hpp>
#include <opm/input/eclipse/Units/UnitSystem.hpp>
#include <opm/grid/utility/StopWatch.hpp>
@ -35,6 +38,8 @@
#include <opm/simulators/flow/ExtraConvergenceOutputThread.hpp>
#include <opm/simulators/flow/NonlinearSolver.hpp>
#include <opm/simulators/flow/SimulatorConvergenceOutput.hpp>
#include <opm/simulators/flow/ReservoirCouplingMaster.hpp>
#include <opm/simulators/flow/ReservoirCouplingSlave.hpp>
#include <opm/simulators/flow/SimulatorReportBanners.hpp>
#include <opm/simulators/flow/SimulatorSerializer.hpp>
#include <opm/simulators/timestepping/AdaptiveTimeStepping.hpp>
@ -63,6 +68,7 @@ struct SaveStep { static constexpr auto* value = ""; };
struct SaveFile { static constexpr auto* value = ""; };
struct LoadFile { static constexpr auto* value = ""; };
struct LoadStep { static constexpr int value = -1; };
struct Slave { static constexpr bool value = false; };
} // namespace Opm::Parameters
@ -78,6 +84,8 @@ namespace Opm {
template<class TypeTag>
class SimulatorFullyImplicitBlackoil : private SerializableSim
{
protected:
struct MPI_Comm_Deleter;
public:
using Simulator = GetPropType<TypeTag, Properties::Simulator>;
using Grid = GetPropType<TypeTag, Properties::Grid>;
@ -171,9 +179,9 @@ public:
/// \param[in,out] timer governs the requested reporting timesteps
/// \param[in,out] state state of reservoir: pressure, fluxes
/// \return simulation report, with timing data
SimulatorReport run(SimulatorTimer& timer)
SimulatorReport run(SimulatorTimer& timer, int argc, char** argv)
{
init(timer);
init(timer, argc, argv);
// Make cache up to date. No need for updating it in elementCtx.
// NB! Need to be at the correct step in case of restart
simulator_.setEpisodeIndex(timer.currentStepNum());
@ -188,8 +196,32 @@ public:
return finalize();
}
void init(SimulatorTimer &timer)
// NOTE: The argc and argv will be used when launching a slave process
void init(SimulatorTimer &timer, int argc, char** argv)
{
auto slave_mode = Parameters::get<TypeTag, Properties::Slave>();
if (slave_mode) {
this->reservoirCouplingSlave_ =
std::make_unique<ReservoirCouplingSlave>(
FlowGenericVanguard::comm(),
this->schedule()
);
this->reservoirCouplingSlave_->sendSimulationStartDateToMasterProcess();
}
else {
// For now, we require that SLAVES and GRUPMAST are defined at the first
// schedule step, so it is enough to check the first step. See the
// keyword handlers in opm-common for more information.
auto master_mode = this->schedule()[0].rescoup().masterMode();
if (master_mode) {
this->reservoirCouplingMaster_ =
std::make_unique<ReservoirCouplingMaster>(
FlowGenericVanguard::comm(),
this->schedule()
);
this->reservoirCouplingMaster_->spawnSlaveProcesses(argc, argv);
}
}
simulator_.setEpisodeIndex(-1);
// Create timers and file for writing timing info.
@ -555,6 +587,10 @@ protected:
SimulatorConvergenceOutput convergence_output_;
bool slaveMode_{false};
std::unique_ptr<ReservoirCouplingMaster> reservoirCouplingMaster_{nullptr};
std::unique_ptr<ReservoirCouplingSlave> reservoirCouplingSlave_{nullptr};
SimulatorSerializer serializer_;
};

View File

@ -242,9 +242,7 @@ const KeywordValidation::UnsupportedKeywords& unsupportedKeywords()
{"GRAVDRB", {true, std::nullopt}},
{"GRAVDRM", {true, std::nullopt}},
{"GRDREACH", {true, std::nullopt}},
{"GRUPMAST", {true, std::nullopt}},
{"GRUPRIG", {true, std::nullopt}},
{"GRUPSLAV", {true, std::nullopt}},
{"GRUPTARG", {true, std::nullopt}},
{"GSATINJE", {true, std::nullopt}},
{"GSEPCOND", {true, std::nullopt}},

View File

@ -54,7 +54,11 @@
#include <opm/input/eclipse/Schedule/Action/State.hpp>
#include <opm/input/eclipse/Schedule/ArrayDimChecker.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/ReservoirCouplingInfo.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/MasterGroup.hpp>
#include <opm/input/eclipse/Schedule/ResCoup/Slaves.hpp>
#include <opm/input/eclipse/Schedule/Schedule.hpp>
#include <opm/input/eclipse/Schedule/UDQ/UDQConfig.hpp>
#include <opm/input/eclipse/Schedule/UDQ/UDQState.hpp>
#include <opm/input/eclipse/Schedule/Well/WellTestState.hpp>
@ -151,7 +155,7 @@ namespace {
if (schedule == nullptr) {
schedule = std::make_shared<Opm::Schedule>
(deck, eclipseState, parseContext, errorGuard,
std::move(python), lowActionParsingStrictness, /*slave_mode=*/false,
std::move(python), lowActionParsingStrictness, /*slaveMode=*/false,
keepKeywords, outputInterval, init_state);
}
@ -180,14 +184,14 @@ namespace {
std::unique_ptr<Opm::UDQState>& udqState,
std::unique_ptr<Opm::Action::State>& actionState,
std::unique_ptr<Opm::WellTestState>& wtestState,
Opm::ErrorGuard& errorGuard)
Opm::ErrorGuard& errorGuard,
const bool slaveMode)
{
if (schedule == nullptr) {
schedule = std::make_shared<Opm::Schedule>
(deck, eclipseState, parseContext,
errorGuard, std::move(python), lowActionParsingStrictness, keepKeywords);
errorGuard, std::move(python), lowActionParsingStrictness, slaveMode, keepKeywords);
}
udqState = std::make_unique<Opm::UDQState>
((*schedule)[0].udq().params().undefinedValue());
@ -233,6 +237,24 @@ namespace {
#endif
}
void inconsistentScheduleError(const std::string& message)
{
OPM_THROW(std::logic_error,
fmt::format("Inconsistent SCHEDULE section: {}", message));
}
void checkScheduleKeywordConsistency(const Opm::Schedule& schedule)
{
const auto& final_state = schedule.back();
const auto& rescoup = final_state.rescoup();
if (rescoup.slaveCount() > 0 && rescoup.masterGroupCount() == 0) {
inconsistentScheduleError("SLAVES keyword without GRUPMAST keyword");
}
if (rescoup.slaveCount() == 0 && rescoup.masterGroupCount() > 0) {
inconsistentScheduleError("GRUPMAST keyword without SLAVES keyword");
}
}
void readOnIORank(Opm::Parallel::Communication comm,
const std::string& deckFilename,
const Opm::ParseContext* parseContext,
@ -249,7 +271,8 @@ namespace {
const bool lowActionParsingStrictness,
const bool keepKeywords,
const std::optional<int>& outputInterval,
Opm::ErrorGuard& errorGuard)
Opm::ErrorGuard& errorGuard,
const bool slaveMode)
{
OPM_TIMEBLOCK(readDeck);
if (((schedule == nullptr) || (summaryConfig == nullptr)) &&
@ -282,9 +305,10 @@ namespace {
lowActionParsingStrictness, keepKeywords,
std::move(python),
schedule, udqState, actionState, wtestState,
errorGuard);
errorGuard, slaveMode);
}
checkScheduleKeywordConsistency(*schedule);
eclipseState->appendAqufluxSchedule(schedule->getAquiferFluxSchedule());
if (Opm::OpmLog::hasBackend("STDOUT_LOGGER")) {
@ -539,7 +563,8 @@ void Opm::readDeck(Opm::Parallel::Communication comm,
const bool initFromRestart,
const bool checkDeck,
const bool keepKeywords,
const std::optional<int>& outputInterval)
const std::optional<int>& outputInterval,
const bool slaveMode)
{
auto errorGuard = std::make_unique<ErrorGuard>();
@ -573,7 +598,7 @@ void Opm::readDeck(Opm::Parallel::Communication comm,
eclipseState, schedule, udqState, actionState, wtestState,
summaryConfig, std::move(python), initFromRestart,
checkDeck, treatCriticalAsNonCritical, lowActionParsingStrictness,
keepKeywords, outputInterval, *errorGuard);
keepKeywords, outputInterval, *errorGuard, slaveMode);
// Update schedule so that re-parsing after actions use same strictness
assert(schedule);

View File

@ -99,7 +99,8 @@ void readDeck(Parallel::Communication comm,
bool initFromRestart,
bool checkDeck,
bool keepKeywords,
const std::optional<int>& outputInterval);
const std::optional<int>& outputInterval,
bool slaveMode);
void verifyValidCellGeometry(Parallel::Communication comm,
const EclipseState& eclipseState);