Revise Convergence Report Collection Procedure

This commit switches the parallel implemenation of function
Opm::gatherConvergenceReport() into using the general serialisation
framework (classes Opm::Serializer<> and Opm::Mpi::Packer).  In
particular, we add serializeOp() functions to each of the types

  - ConvergenceReport
  - ConvergenceReport::ReservoirFailure
  - ConvergenceReport::ReservoirConvergenceMetric
  - ConvergenceReport::WellFailure

and defer the job of converting the objects between in-memory and
byte stream representations to Opm::Serializer<>.  The new special
purpose class CollectConvReports inherits from the latter and uses
its pack() and unpack() member functions, along with its internal
m_buffer data member, to distribute each rank's convergence report
object to all ranks.  We add this feature here, in a very narrowly
scoped use case, to enable testing and experimentation before we
consider adding this distribution mechanism as a general feature in
Opm::MpiSerializer.
This commit is contained in:
Bård Skaflestad 2024-08-07 17:47:11 +02:00
parent 68cc5d917e
commit 0b40277e01
4 changed files with 250 additions and 353 deletions

View File

@ -1,6 +1,6 @@
/*
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2018 Equinor.
Copyright 2018, 2024 Equinor.
This file is part of the Open Porous Media project (OPM).
@ -62,6 +62,10 @@ namespace Opm
public:
enum struct Type { Invalid, MassBalance, Cnv };
// Default constructor needed for object serialisation. Don't
// use this for anything else.
ReservoirFailure() = default;
ReservoirFailure(Type t, Severity s, int phase)
: type_(t), severity_(s), phase_(phase)
{}
@ -70,15 +74,29 @@ namespace Opm
Severity severity() const { return severity_; }
int phase() const { return phase_; }
template <typename Serializer>
void serializeOp(Serializer& serializer)
{
serializer(this->type_);
serializer(this->severity_);
serializer(this->phase_);
}
private:
Type type_;
Severity severity_;
int phase_;
// Note to maintainers: If you change this list of data members,
// then please update serializeOp() accordingly.
Type type_ { Type::Invalid };
Severity severity_ { Severity::None };
int phase_ { -1 };
};
class ReservoirConvergenceMetric
{
public:
// Default constructor needed for object serialisation. Don't
// use this for anything else.
ReservoirConvergenceMetric() = default;
ReservoirConvergenceMetric(ReservoirFailure::Type t, int phase, double value)
: type_(t), phase_(phase), value_(value)
{}
@ -87,10 +105,20 @@ namespace Opm
int phase() const { return phase_; }
double value() const { return value_; }
template <typename Serializer>
void serializeOp(Serializer& serializer)
{
serializer(this->type_);
serializer(this->phase_);
serializer(this->value_);
}
private:
ReservoirFailure::Type type_;
int phase_;
double value_;
// Note to maintainers: If you change this list of data members,
// then please update serializeOp() accordingly.
ReservoirFailure::Type type_ { ReservoirFailure::Type::Invalid };
int phase_ { -1 };
double value_ { 0.0 };
};
class WellFailure
@ -107,6 +135,10 @@ namespace Opm
WrongFlowDirection,
};
// Default constructor needed for object serialisation. Don't
// use this for anything else.
WellFailure() = default;
WellFailure(Type t, Severity s, int phase, const std::string& well_name)
: type_(t), severity_(s), phase_(phase), well_name_(well_name)
{}
@ -116,11 +148,22 @@ namespace Opm
int phase() const { return phase_; }
const std::string& wellName() const { return well_name_; }
template <typename Serializer>
void serializeOp(Serializer& serializer)
{
serializer(this->type_);
serializer(this->severity_);
serializer(this->phase_);
serializer(this->well_name_);
}
private:
Type type_;
Severity severity_;
int phase_;
std::string well_name_;
// Note to maintainers: If you change this list of data members,
// then please update serializeOp() accordingly.
Type type_ { Type::Invalid };
Severity severity_ { Severity::None };
int phase_ { -1 };
std::string well_name_ {};
};
// ----------- Mutating member functions -----------
@ -264,8 +307,23 @@ namespace Opm
return s;
}
template <typename Serializer>
void serializeOp(Serializer& serializer)
{
serializer(this->reportTime_);
serializer(this->status_);
serializer(this->res_failures_);
serializer(this->well_failures_);
serializer(this->res_convergence_);
serializer(this->wellGroupTargetsViolated_);
serializer(this->cnvPvSplit_);
serializer(this->eligiblePoreVolume_);
}
private:
// ----------- Member variables -----------
// Note to maintainers: If you change this list of data members,
// then please update serializeOp() accordingly.
double reportTime_;
Status status_;
std::vector<ReservoirFailure> res_failures_;

View File

@ -1,5 +1,5 @@
/*
Copyright 2018, 2022 Equinor ASA.
Copyright 2018, 2022, 2024 Equinor ASA.
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
This file is part of the Open Porous Media project (OPM).
@ -22,384 +22,154 @@
#include <opm/simulators/timestepping/gatherConvergenceReport.hpp>
#include <algorithm>
#include <tuple>
#include <utility>
#include <vector>
#include <opm/simulators/utils/ParallelCommunication.hpp>
#if HAVE_MPI
#include <mpi.h>
#include <opm/common/utility/Serializer.hpp>
namespace
{
#include <opm/simulators/utils/MPIPacker.hpp>
using Opm::ConvergenceReport;
#include <opm/grid/common/CommunicationUtils.hpp>
void packReservoirFailure(const ConvergenceReport::ReservoirFailure& f,
std::vector<char>& buf, int& offset,
MPI_Comm mpi_communicator)
#include <algorithm>
#include <tuple>
#include <vector>
namespace {
/// Special purpose utility to collect each rank's local convergence
/// report object and distribute those to all ranks.
///
/// This particular feature could arguably be a part of the
/// Opm::MpiSerializer class, but we create it here first as a narrowly
/// scoped testing ground.
///
/// Inherits from Opm::Serializer<> in order to use that type's pack()
/// and unpack() member functions along with its internal 'm_buffer'
/// data member.
class CollectConvReports : private Opm::Serializer<Opm::Mpi::Packer>
{
auto pack = [&buf, &offset, mpi_communicator]
(const auto* ptr, const int size, const auto type)
{
MPI_Pack(ptr, size, type,
buf.data(), buf.size(), &offset,
mpi_communicator);
};
public:
/// Constructor.
///
/// \param[in] packer Packing object to convert in-memory objects to
/// a byte representation. Lifetime must exceed that of the
/// CollectConvReports object.
///
/// \param[in] comm MPI communicator. Should be the same as the
/// internal communicator in the \p packer object.
explicit CollectConvReports(const Opm::Mpi::Packer& packer,
const Opm::Parallel::Communication comm)
: Opm::Serializer<Opm::Mpi::Packer> { packer }
, comm_ { comm }
{}
const int type = static_cast<int>(f.type());
const int severity = static_cast<int>(f.severity());
const int phase = f.phase();
/// Collect local convergence reports from each MPI rank.
///
/// \param[in] report Local convergence report from the current MPI
/// rank.
///
/// \return Collection of local convergence reports from all MPI
/// ranks in the current communicator. Report objects returned in
/// rank order.
std::vector<Opm::ConvergenceReport>
operator()(const Opm::ConvergenceReport& report);
pack(&type, 1, MPI_INT);
pack(&severity, 1, MPI_INT);
pack(&phase, 1, MPI_INT);
}
private:
/// MPI communicator.
Opm::Parallel::Communication comm_;
void packReservoirConvergenceMetric(const ConvergenceReport::ReservoirConvergenceMetric& m,
std::vector<char>& buf, int& offset,
MPI_Comm mpi_communicator)
/// Byte representation of local convergence report objects from all
/// ranks.
///
/// Only valid during a call to operator().
std::vector<char> rankBuffers_{};
/// Start pointers into rankBuffers_ for each rank's local
/// convergence report object.
std::vector<int> rankStart_{};
/// Reconstitute a convergence report from byte stream representation.
///
/// \param[in] rank MPI rank for which to reconstitute the local
/// convergence report object.
///
/// \param[in,out] report \p rank's local convergence report object.
/// On entry, a default constructed object usable as a destination
/// for deserialisation. On exit, a fully populated convergence
/// report object filled from the byte stream of \p rank.
void deserialise(const std::vector<int>::size_type rank,
Opm::ConvergenceReport& report);
};
std::vector<Opm::ConvergenceReport>
CollectConvReports::operator()(const Opm::ConvergenceReport& report)
{
auto pack = [&buf, &offset, mpi_communicator]
(const auto* ptr, const int size, const auto type)
{
MPI_Pack(ptr, size, type,
buf.data(), buf.size(), &offset,
mpi_communicator);
};
auto rankReports = std::vector<Opm::ConvergenceReport>(this->comm_.size());
const int type = static_cast<int>(m.type());
const int phase = m.phase();
const double value = m.value();
this->pack(report);
pack(&type, 1, MPI_INT);
pack(&phase, 1, MPI_INT);
pack(&value, 1, MPI_DOUBLE);
}
std::tie(this->rankBuffers_, this->rankStart_) =
Opm::allGatherv(this->m_buffer, this->comm_);
void packWellFailure(const ConvergenceReport::WellFailure& f,
std::vector<char>& buf, int& offset,
MPI_Comm mpi_communicator)
{
auto pack = [&buf, &offset, mpi_communicator]
(const auto* ptr, const int size, const auto type)
{
MPI_Pack(ptr, size, type,
buf.data(), buf.size(), &offset,
mpi_communicator);
};
const int type = static_cast<int>(f.type());
const int severity = static_cast<int>(f.severity());
const int phase = f.phase();
pack(&type, 1, MPI_INT);
pack(&severity, 1, MPI_INT);
pack(&phase, 1, MPI_INT);
// Add one for the null terminator.
const int name_length = f.wellName().size() + 1;
pack(&name_length, 1, MPI_INT);
pack(f.wellName().c_str(), name_length, MPI_CHAR);
}
void packConvergenceReport(const ConvergenceReport& local_report,
std::vector<char>& buf, int& offset,
MPI_Comm mpi_communicator)
{
auto pack = [&buf, &offset, mpi_communicator]
(const auto* ptr, const int size, const auto type)
{
MPI_Pack(ptr, size, type,
buf.data(), buf.size(), &offset,
mpi_communicator);
};
// Pack the data.
// Status will not be packed, it is possible to deduce from the other data.
const auto reportTime = local_report.reportTime();
pack(&reportTime, 1, MPI_DOUBLE);
// CNV pore-volume split
{
const auto& cnvPvSplit = local_report.cnvPvSplit();
const auto eligiblePoreVolume = local_report.eligiblePoreVolume();
const auto num_cnv_pv = static_cast<int>(cnvPvSplit.first.size());
pack(&eligiblePoreVolume , 1 , MPI_DOUBLE);
pack(&num_cnv_pv , 1 , MPI_INT);
pack(cnvPvSplit.first .data(), num_cnv_pv, MPI_DOUBLE);
pack(cnvPvSplit.second.data(), num_cnv_pv, MPI_INT);
auto rank = std::vector<int>::size_type{0};
for (auto& rankReport : rankReports) {
this->deserialise(rank++, rankReport);
}
// Reservoir failures.
{
const auto rf = local_report.reservoirFailures();
const int num_rf = rf.size();
pack(&num_rf, 1, MPI_INT);
for (const auto& f : rf) {
packReservoirFailure(f, buf, offset, mpi_communicator);
}
}
// Reservoir convergence metrics.
{
const auto rm = local_report.reservoirConvergence();
const int num_rm = rm.size();
pack(&num_rm, 1, MPI_INT);
for (const auto& m : rm) {
packReservoirConvergenceMetric(m, buf, offset, mpi_communicator);
}
}
// Well failures.
{
const auto wf = local_report.wellFailures();
const int num_wf = wf.size();
pack(&num_wf, 1, MPI_INT);
for (const auto& f : wf) {
packWellFailure(f, buf, offset, mpi_communicator);
}
}
return rankReports;
}
int messageSize(const ConvergenceReport& local_report, MPI_Comm mpi_communicator)
void CollectConvReports::deserialise(const std::vector<int>::size_type rank,
Opm::ConvergenceReport& report)
{
int int_pack_size = 0;
MPI_Pack_size(1, MPI_INT, mpi_communicator, &int_pack_size);
auto begin = this->rankBuffers_.begin() + this->rankStart_[rank + 0];
auto end = this->rankBuffers_.begin() + this->rankStart_[rank + 1];
int double_pack_size = 0;
MPI_Pack_size(1, MPI_DOUBLE, mpi_communicator, &double_pack_size);
this->m_buffer.assign(begin, end);
this->m_packSize = std::distance(begin, end);
int char_pack_size = 0;
MPI_Pack_size(1, MPI_CHAR, mpi_communicator, &char_pack_size);
const int num_cnv_pv = local_report.cnvPvSplit().first.size();
const int num_rf = local_report.reservoirFailures().size();
const int num_rm = local_report.reservoirConvergence().size();
const int num_wf = local_report.wellFailures().size();
int wellnames_length = 0;
for (const auto& f : local_report.wellFailures()) {
// Add one for the null terminator.
wellnames_length += f.wellName().size() + 1;
}
return (3 + 1 + num_cnv_pv + 3*num_rf + 2*num_rm + 4*num_wf)*int_pack_size
+ (1 + 1 + num_cnv_pv + 1*num_rm)*double_pack_size
+ wellnames_length*char_pack_size;
this->unpack(report);
}
ConvergenceReport::ReservoirFailure
unpackReservoirFailure(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
auto unpackInt = [data = recv_buffer.data(),
size = static_cast<int>(recv_buffer.size()),
&offset, mpi_communicator]()
{
auto x = -1;
MPI_Unpack(data, size, &offset, &x, 1, MPI_INT, mpi_communicator);
return x;
};
const auto type = unpackInt();
const auto severity = unpackInt();
const auto phase = unpackInt();
return {
static_cast<ConvergenceReport::ReservoirFailure::Type>(type),
static_cast<ConvergenceReport::Severity>(severity),
phase
};
}
ConvergenceReport::ReservoirConvergenceMetric
unpackReservoirConvergenceMetric(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
auto unpack = [data = recv_buffer.data(),
size = static_cast<int>(recv_buffer.size()),
&offset, mpi_communicator](const auto type, auto x)
{
MPI_Unpack(data, size, &offset, &x, 1, type, mpi_communicator);
return x;
};
const auto type = unpack(MPI_INT, -1);
const auto phase = unpack(MPI_INT, -1);
const auto value = unpack(MPI_DOUBLE, -1.0);
return { static_cast<ConvergenceReport::ReservoirFailure::Type>(type), phase, value };
}
ConvergenceReport::WellFailure
unpackWellFailure(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
auto unpackInt = [data = recv_buffer.data(),
size = static_cast<int>(recv_buffer.size()),
&offset, mpi_communicator]()
{
auto x = -1;
MPI_Unpack(data, size, &offset, &x, 1, MPI_INT, mpi_communicator);
return x;
};
const auto type = unpackInt();
const auto severity = unpackInt();
const auto phase = unpackInt();
const auto name_length = unpackInt();
std::vector<char> namechars(name_length);
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset,
namechars.data(), name_length,
MPI_CHAR, mpi_communicator);
return {
static_cast<ConvergenceReport::WellFailure::Type>(type),
static_cast<ConvergenceReport::Severity>(severity),
phase,
const_cast<const std::vector<char>&>(namechars).data()
};
}
ConvergenceReport
unpackSingleConvergenceReport(const std::vector<char>& recv_buffer, int& offset, MPI_Comm mpi_communicator)
{
auto unpack = [data = recv_buffer.data(),
size = static_cast<int>(recv_buffer.size()),
&offset, mpi_communicator](const auto type, auto x)
{
MPI_Unpack(data, size, &offset, &x, 1, type, mpi_communicator);
return x;
};
auto cr = ConvergenceReport { unpack(MPI_DOUBLE, 0.0) };
{
const auto eligiblePoreVolume = unpack(MPI_DOUBLE, 0.0);
const auto num_cnv_pv = unpack(MPI_INT, -1);
auto cnvPvSplit = ConvergenceReport::CnvPvSplit {
std::piecewise_construct,
std::forward_as_tuple(num_cnv_pv),
std::forward_as_tuple(num_cnv_pv)
};
const auto* data = recv_buffer.data();
MPI_Unpack(data, recv_buffer.size(), &offset,
cnvPvSplit.first.data(), num_cnv_pv,
MPI_DOUBLE, mpi_communicator);
MPI_Unpack(data, recv_buffer.size(), &offset,
cnvPvSplit.second.data(), num_cnv_pv,
MPI_DOUBLE, mpi_communicator);
cr.setCnvPoreVolSplit(cnvPvSplit, eligiblePoreVolume);
}
{
const auto num_rf = unpack(MPI_INT, -1);
for (int rf = 0; rf < num_rf; ++rf) {
cr.setReservoirFailed(unpackReservoirFailure(recv_buffer, offset, mpi_communicator));
}
}
{
const auto num_rm = unpack(MPI_INT, -1);
for (int rm = 0; rm < num_rm; ++rm) {
cr.setReservoirConvergenceMetric(unpackReservoirConvergenceMetric(recv_buffer, offset, mpi_communicator));
}
}
{
const auto num_wf = unpack(MPI_INT, -1);
for (int wf = 0; wf < num_wf; ++wf) {
cr.setWellFailed(unpackWellFailure(recv_buffer, offset, mpi_communicator));
}
}
return cr;
}
ConvergenceReport
unpackConvergenceReports(const std::vector<char>& recv_buffer,
const std::vector<int>& displ,
MPI_Comm mpi_communicator)
{
ConvergenceReport cr;
const int num_processes = displ.size() - 1;
for (int process = 0; process < num_processes; ++process) {
int offset = displ[process];
cr += unpackSingleConvergenceReport(recv_buffer, offset, mpi_communicator);
assert(offset == displ[process + 1]);
}
return cr;
}
} // anonymous namespace
} // Anonymous namespace
namespace Opm
{
/// Create a global convergence report combining local
/// (per-process) reports.
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report,
Parallel::Communication mpi_communicator)
/// Create a global convergence report combining local (per-process)
/// reports.
ConvergenceReport
gatherConvergenceReport(const ConvergenceReport& local_report,
Parallel::Communication mpi_communicator)
{
// Pack local report.
const int message_size = messageSize(local_report, mpi_communicator);
std::vector<char> buffer(message_size);
{
int offset = 0;
packConvergenceReport(local_report, buffer, offset, mpi_communicator);
assert(offset == message_size);
if (mpi_communicator.size() == 1) {
// Sequential run, no communication needed.
return local_report;
}
// Get message sizes and create offset/displacement array for gathering.
int num_processes = -1;
MPI_Comm_size(mpi_communicator, &num_processes);
// Multi-process run (common case). Need object distribution.
auto combinedReport = ConvergenceReport {};
std::vector<int> message_sizes(num_processes);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, mpi_communicator);
const auto packer = Mpi::Packer { mpi_communicator };
std::vector<int> displ(num_processes + 1, 0);
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
const auto rankReports =
CollectConvReports { packer, mpi_communicator }(local_report);
// Gather.
std::vector<char> recv_buffer(displ.back());
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
recv_buffer.data(), message_sizes.data(),
displ.data(), MPI_PACKED,
mpi_communicator);
for (const auto& rankReport : rankReports) {
combinedReport += rankReport;
}
// Unpack.
return unpackConvergenceReports(recv_buffer, displ, mpi_communicator);
return combinedReport;
}
} // namespace Opm
#else // HAVE_MPI
#else // !HAVE_MPI
namespace Opm
{
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report,
[[maybe_unused]] Parallel::Communication mpi_communicator)
ConvergenceReport
gatherConvergenceReport(const ConvergenceReport& local_report,
[[maybe_unused]] Parallel::Communication mpi_communicator)
{
return local_report;
}

View File

@ -30,9 +30,10 @@ namespace Opm
/// Create a global convergence report combining local
/// (per-process) reports.
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report, Parallel::Communication communicator);
ConvergenceReport
gatherConvergenceReport(const ConvergenceReport& local_report,
Parallel::Communication communicator);
} // namespace Opm
#endif // OPM_GATHERCONVERGENCEREPORT_HEADER_INCLUDED

View File

@ -106,6 +106,74 @@ BOOST_AUTO_TEST_CASE(EvenHaveFailure)
}
}
namespace {
class NProc_Is_Not
{
public:
explicit NProc_Is_Not(const int rejectNP)
: rejectNP_ { rejectNP }
{}
boost::test_tools::assertion_result
operator()(boost::unit_test::test_unit_id) const
{
auto comm = Opm::Parallel::Communication {
Dune::MPIHelper::getCommunicator()
};
if (comm.size() != this->rejectNP_) {
return true;
}
boost::test_tools::assertion_result response(false);
response.message() << "Number of MPI processes ("
<< comm.size()
<< ") matches rejected case.";
return response;
}
private:
int rejectNP_{};
};
} // Anonymous namespace
BOOST_AUTO_TEST_CASE(CNV_PV_SPLIT, * boost::unit_test::precondition(NProc_Is_Not{1}))
{
const auto cc = Dune::MPIHelper::getCommunication();
auto loc_rpt = Opm::ConvergenceReport {};
if (cc.rank() != 0) {
// All ranks are supposed to have the *same* pore-volume split of
// the CNV metrics, except if the pv split is empty.
const auto pvSplit = Opm::ConvergenceReport::CnvPvSplit {
{ 0.75, 0.2, 0.05, },
{ 1234, 56 , 7 , }
};
loc_rpt.setCnvPoreVolSplit(pvSplit, 9.876e5);
}
const auto cr = gatherConvergenceReport(loc_rpt, cc);
const auto& [pvFrac, cellCnt] = cr.cnvPvSplit();
BOOST_REQUIRE_EQUAL(pvFrac.size(), std::size_t{3});
BOOST_REQUIRE_EQUAL(cellCnt.size(), std::size_t{3});
BOOST_CHECK_CLOSE(cr.eligiblePoreVolume(), 9.876e5, 1.0e-8);
BOOST_CHECK_CLOSE(pvFrac[0], 0.75, 1.0e-8);
BOOST_CHECK_CLOSE(pvFrac[1], 0.20, 1.0e-8);
BOOST_CHECK_CLOSE(pvFrac[2], 0.05, 1.0e-8);
BOOST_CHECK_EQUAL(cellCnt[0], 1234);
BOOST_CHECK_EQUAL(cellCnt[1], 56);
BOOST_CHECK_EQUAL(cellCnt[2], 7);
}
int main(int argc, char** argv)
{
Dune::MPIHelper::instance(argc, argv);