Replacing use of MPI_COMM_WORLD with a variable communicator.

This commit is contained in:
Elyes Ahmed
2021-05-25 12:57:11 +02:00
committed by Atgeirr Flø Rasmussen
parent 61ef539bf5
commit f53c597f90
48 changed files with 584 additions and 420 deletions

View File

@@ -33,57 +33,57 @@ namespace
void packReservoirFailure(const ConvergenceReport::ReservoirFailure& f,
std::vector<char>& buf,
int& offset)
int& offset, MPI_Comm mpi_communicator)
{
int type = static_cast<int>(f.type());
int severity = static_cast<int>(f.severity());
int phase = f.phase();
MPI_Pack(&type, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&severity, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&phase, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&type, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
MPI_Pack(&severity, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
MPI_Pack(&phase, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
}
void packWellFailure(const ConvergenceReport::WellFailure& f,
std::vector<char>& buf,
int& offset)
int& offset, MPI_Comm mpi_communicator)
{
int type = static_cast<int>(f.type());
int severity = static_cast<int>(f.severity());
int phase = f.phase();
MPI_Pack(&type, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&severity, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&phase, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&type, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
MPI_Pack(&severity, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
MPI_Pack(&phase, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
int name_length = f.wellName().size() + 1; // Adding 1 for the null terminator.
MPI_Pack(&name_length, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(const_cast<char*>(f.wellName().c_str()), name_length, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&name_length, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
MPI_Pack(const_cast<char*>(f.wellName().c_str()), name_length, MPI_CHAR, buf.data(), buf.size(), &offset, mpi_communicator);
}
void packConvergenceReport(const ConvergenceReport& local_report,
std::vector<char>& buf,
int& offset)
int& offset, MPI_Comm mpi_communicator)
{
// Pack the data.
// Status will not be packed, it is possible to deduce from the other data.
// Reservoir failures.
const auto rf = local_report.reservoirFailures();
int num_rf = rf.size();
MPI_Pack(&num_rf, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&num_rf, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
for (const auto& f : rf) {
packReservoirFailure(f, buf, offset);
packReservoirFailure(f, buf, offset, mpi_communicator);
}
// Well failures.
const auto wf = local_report.wellFailures();
int num_wf = wf.size();
MPI_Pack(&num_wf, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&num_wf, 1, MPI_INT, buf.data(), buf.size(), &offset, mpi_communicator);
for (const auto& f : wf) {
packWellFailure(f, buf, offset);
packWellFailure(f, buf, offset, mpi_communicator);
}
}
int messageSize(const ConvergenceReport& local_report)
int messageSize(const ConvergenceReport& local_report, MPI_Comm mpi_communicator)
{
int int_pack_size = 0;
MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &int_pack_size);
MPI_Pack_size(1, MPI_INT, mpi_communicator, &int_pack_size);
const int num_rf = local_report.reservoirFailures().size();
const int num_wf = local_report.wellFailures().size();
int wellnames_length = 0;
@@ -93,33 +93,33 @@ namespace
return (2 + 3*num_rf + 4*num_wf) * int_pack_size + wellnames_length;
}
ConvergenceReport::ReservoirFailure unpackReservoirFailure(const std::vector<char>& recv_buffer, int& offset)
ConvergenceReport::ReservoirFailure unpackReservoirFailure(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
int type = -1;
int severity = -1;
int phase = -1;
auto* data = const_cast<char*>(recv_buffer.data());
MPI_Unpack(data, recv_buffer.size(), &offset, &type, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &severity, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &phase, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &type, 1, MPI_INT, mpi_communicator);
MPI_Unpack(data, recv_buffer.size(), &offset, &severity, 1, MPI_INT, mpi_communicator);
MPI_Unpack(data, recv_buffer.size(), &offset, &phase, 1, MPI_INT, mpi_communicator);
return ConvergenceReport::ReservoirFailure(static_cast<ConvergenceReport::ReservoirFailure::Type>(type),
static_cast<ConvergenceReport::Severity>(severity),
phase);
}
ConvergenceReport::WellFailure unpackWellFailure(const std::vector<char>& recv_buffer, int& offset)
ConvergenceReport::WellFailure unpackWellFailure(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
int type = -1;
int severity = -1;
int phase = -1;
auto* data = const_cast<char*>(recv_buffer.data());
MPI_Unpack(data, recv_buffer.size(), &offset, &type, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &severity, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &phase, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &type, 1, MPI_INT, mpi_communicator);
MPI_Unpack(data, recv_buffer.size(), &offset, &severity, 1, MPI_INT, mpi_communicator);
MPI_Unpack(data, recv_buffer.size(), &offset, &phase, 1, MPI_INT, mpi_communicator);
int name_length = -1;
MPI_Unpack(data, recv_buffer.size(), &offset, &name_length, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &name_length, 1, MPI_INT, mpi_communicator);
std::vector<char> namechars(name_length);
MPI_Unpack(data, recv_buffer.size(), &offset, namechars.data(), name_length, MPI_CHAR, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, namechars.data(), name_length, MPI_CHAR, mpi_communicator);
std::string name(namechars.data());
return ConvergenceReport::WellFailure(static_cast<ConvergenceReport::WellFailure::Type>(type),
static_cast<ConvergenceReport::Severity>(severity),
@@ -127,33 +127,33 @@ namespace
name);
}
ConvergenceReport unpackSingleConvergenceReport(const std::vector<char>& recv_buffer, int& offset)
ConvergenceReport unpackSingleConvergenceReport(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
ConvergenceReport cr;
int num_rf = -1;
auto* data = const_cast<char*>(recv_buffer.data());
MPI_Unpack(data, recv_buffer.size(), &offset, &num_rf, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &num_rf, 1, MPI_INT, mpi_communicator);
for (int rf = 0; rf < num_rf; ++rf) {
ConvergenceReport::ReservoirFailure f = unpackReservoirFailure(recv_buffer, offset);
ConvergenceReport::ReservoirFailure f = unpackReservoirFailure(recv_buffer, offset, mpi_communicator);
cr.setReservoirFailed(f);
}
int num_wf = -1;
MPI_Unpack(data, recv_buffer.size(), &offset, &num_wf, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(data, recv_buffer.size(), &offset, &num_wf, 1, MPI_INT, mpi_communicator);
for (int wf = 0; wf < num_wf; ++wf) {
ConvergenceReport::WellFailure f = unpackWellFailure(recv_buffer, offset);
ConvergenceReport::WellFailure f = unpackWellFailure(recv_buffer, offset, mpi_communicator);
cr.setWellFailed(f);
}
return cr;
}
ConvergenceReport unpackConvergenceReports(const std::vector<char>& recv_buffer,
const std::vector<int>& displ)
const std::vector<int>& displ, MPI_Comm mpi_communicator)
{
ConvergenceReport cr;
const int num_processes = displ.size() - 1;
for (int process = 0; process < num_processes; ++process) {
int offset = displ[process];
cr += unpackSingleConvergenceReport(recv_buffer, offset);
cr += unpackSingleConvergenceReport(recv_buffer, offset, mpi_communicator);
assert(offset == displ[process + 1]);
}
return cr;
@@ -167,20 +167,20 @@ namespace Opm
/// Create a global convergence report combining local
/// (per-process) reports.
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report)
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report, Parallel::Communication mpi_communicator)
{
// Pack local report.
int message_size = messageSize(local_report);
int message_size = messageSize(local_report, mpi_communicator);
std::vector<char> buffer(message_size);
int offset = 0;
packConvergenceReport(local_report, buffer, offset);
packConvergenceReport(local_report, buffer, offset,mpi_communicator);
assert(offset == message_size);
// Get message sizes and create offset/displacement array for gathering.
int num_processes = -1;
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
MPI_Comm_size(mpi_communicator, &num_processes);
std::vector<int> message_sizes(num_processes);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, mpi_communicator);
std::vector<int> displ(num_processes + 1, 0);
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
@@ -189,10 +189,10 @@ namespace Opm
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
const_cast<char*>(recv_buffer.data()), message_sizes.data(),
displ.data(), MPI_PACKED,
MPI_COMM_WORLD);
mpi_communicator);
// Unpack.
ConvergenceReport global_report = unpackConvergenceReports(recv_buffer, displ);
ConvergenceReport global_report = unpackConvergenceReports(recv_buffer, displ, mpi_communicator);
return global_report;
}
@@ -202,7 +202,7 @@ namespace Opm
namespace Opm
{
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report)
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report, Parallel::Communication mpi_communicator)
{
return local_report;
}

View File

@@ -21,14 +21,24 @@
#ifndef OPM_GATHERCONVERGENCEREPORT_HEADER_INCLUDED
#define OPM_GATHERCONVERGENCEREPORT_HEADER_INCLUDED
#include <dune/common/version.hh>
#include <opm/simulators/timestepping/ConvergenceReport.hpp>
#include <dune/common/parallel/mpihelper.hh>
namespace Opm::Parallel {
#if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
using Communication = Dune::Communication<Dune::MPIHelper::MPICommunicator>;
#else
using Communication = Dune::CollectiveCommunication<Dune::MPIHelper::MPICommunicator>;
#endif
} // end namespace Communication
namespace Opm
{
/// Create a global convergence report combining local
/// (per-process) reports.
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report);
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report, Parallel::Communication communicator);
} // namespace Opm