Created gatherConvergenceReport() function and test.

This commit is contained in:
Atgeirr Flø Rasmussen 2018-10-25 14:54:16 +02:00
parent 48b0024855
commit 566f6813dd
4 changed files with 324 additions and 2 deletions

View File

@ -132,6 +132,7 @@ list (APPEND TEST_SOURCE_FILES
tests/test_block.cpp
tests/test_boprops_ad.cpp
tests/test_convergencereport.cpp
tests/test_gatherconvergencereport.cpp
tests/test_graphcoloring.cpp
tests/test_rateconverter.cpp
tests/test_span.cpp

View File

@ -0,0 +1,202 @@
/*
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2018 Equinor.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef OPM_GATHERCONVERGENCEREPORT_HEADER_INCLUDED
#define OPM_GATHERCONVERGENCEREPORT_HEADER_INCLUDED
#include <opm/simulators/timestepping/ConvergenceReport.hpp>
#if HAVE_MPI
#include <mpi.h>
namespace Opm
{
void packReservoirFailure(const ConvergenceReport::ReservoirFailure& f,
std::vector<char>& buf,
int& offset)
{
int type = static_cast<int>(f.type());
int severity = static_cast<int>(f.severity());
int phase = f.phase();
MPI_Pack(&type, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&severity, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&phase, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
void packWellFailure(const ConvergenceReport::WellFailure& f,
std::vector<char>& buf,
int& offset)
{
int type = static_cast<int>(f.type());
int severity = static_cast<int>(f.severity());
int phase = f.phase();
MPI_Pack(&type, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&severity, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(&phase, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
int name_length = f.wellName().size() + 1; // Adding 1 for the null terminator.
MPI_Pack(&name_length, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(f.wellName().c_str(), name_length, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
void packConvergenceReport(const ConvergenceReport& local_report,
std::vector<char>& buf,
int& offset)
{
// Pack the data.
// Status will not be packed, it is possible to deduce from the other data.
// Reservoir failures.
const auto rf = local_report.reservoirFailures();
int num_rf = rf.size();
MPI_Pack(&num_rf, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
for (const auto f : rf) {
packReservoirFailure(f, buf, offset);
}
// Well failures.
const auto wf = local_report.wellFailures();
int num_wf = wf.size();
MPI_Pack(&num_wf, 1, MPI_INT, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
for (const auto f : wf) {
packWellFailure(f, buf, offset);
}
}
int messageSize(const ConvergenceReport& local_report)
{
int int_pack_size = 0;
MPI_Pack_size(1, MPI_INT, MPI_COMM_WORLD, &int_pack_size);
const int num_rf = local_report.reservoirFailures().size();
const int num_wf = local_report.wellFailures().size();
int wellnames_length = 0;
for (const auto f : local_report.wellFailures()) {
wellnames_length += (f.wellName().size() + 1);
}
return (2 + 3*num_rf + 4*num_wf) * int_pack_size + wellnames_length;
}
ConvergenceReport::ReservoirFailure unpackReservoirFailure(const std::vector<char>& recv_buffer, int& offset)
{
int type = -1;
int severity = -1;
int phase = -1;
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &type, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &severity, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &phase, 1, MPI_INT, MPI_COMM_WORLD);
return ConvergenceReport::ReservoirFailure(static_cast<ConvergenceReport::ReservoirFailure::Type>(type),
static_cast<ConvergenceReport::Severity>(severity),
phase);
}
ConvergenceReport::WellFailure unpackWellFailure(const std::vector<char>& recv_buffer, int& offset)
{
int type = -1;
int severity = -1;
int phase = -1;
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &type, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &severity, 1, MPI_INT, MPI_COMM_WORLD);
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &phase, 1, MPI_INT, MPI_COMM_WORLD);
int name_length = -1;
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &name_length, 1, MPI_INT, MPI_COMM_WORLD);
std::vector<char> namechars(name_length);
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, namechars.data(), name_length, MPI_CHAR, MPI_COMM_WORLD);
std::string name(namechars.data());
return ConvergenceReport::WellFailure(static_cast<ConvergenceReport::WellFailure::Type>(type),
static_cast<ConvergenceReport::Severity>(severity),
phase,
name);
}
ConvergenceReport unpackSingleConvergenceReport(const std::vector<char>& recv_buffer, int& offset)
{
ConvergenceReport cr;
int num_rf = -1;
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &num_rf, 1, MPI_INT, MPI_COMM_WORLD);
for (int rf = 0; rf < num_rf; ++rf) {
ConvergenceReport::ReservoirFailure f = unpackReservoirFailure(recv_buffer, offset);
cr.setReservoirFailed(f);
}
int num_wf = -1;
MPI_Unpack(recv_buffer.data(), recv_buffer.size(), &offset, &num_wf, 1, MPI_INT, MPI_COMM_WORLD);
for (int wf = 0; wf < num_wf; ++wf) {
ConvergenceReport::WellFailure f = unpackWellFailure(recv_buffer, offset);
cr.setWellFailed(f);
}
return cr;
}
ConvergenceReport unpackConvergenceReports(const std::vector<char>& recv_buffer,
const std::vector<int>& displ)
{
ConvergenceReport cr;
const int num_processes = displ.size() - 1;
for (int process = 0; process < num_processes; ++process) {
int offset = displ[process];
cr += unpackSingleConvergenceReport(recv_buffer, offset);
assert(offset == displ[process + 1]);
}
return cr;
}
/// Create a global convergence report combining local
/// (per-process) reports.
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report)
{
// Pack local report.
const int message_size = messageSize(local_report);
std::vector<char> buffer(message_size);
int offset = 0;
packConvergenceReport(local_report, buffer, offset);
assert(offset == message_size);
// Get message sizes and create offset/displacement array for gathering.
int num_processes = -1;
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
std::vector<int> message_sizes(num_processes);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
std::vector<int> displ(num_processes + 1, 0);
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
// Gather.
std::vector<char> recv_buffer(displ.back());
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
recv_buffer.data(), message_sizes.data(), displ.data(), MPI_PACKED,
MPI_COMM_WORLD);
// Unpack.
ConvergenceReport global_report = unpackConvergenceReports(recv_buffer, displ);
return global_report;
}
} // namespace Opm
#else // HAVE_MPI
namespace Opm
{
ConvergenceReport gatherConvergenceReport(const ConvergenceReport& local_report)
{
return local_report;
}
} // namespace Opm
#endif // HAVE_MPI
#endif // OPM_GATHERCONVERGENCEREPORT_HEADER_INCLUDED

View File

@ -0,0 +1,119 @@
/*
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2018 Equinor.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#define BOOST_TEST_MODULE TestGatherConvergenceReport
#define BOOST_TEST_NO_MAIN
#include <boost/test/unit_test.hpp>
#include <opm/simulators/timestepping/gatherConvergenceReport.hpp>
#include <dune/common/parallel/mpihelper.hh>
#if HAVE_MPI
struct MPIError
{
MPIError(std::string s, int e) : errorstring(std::move(s)), errorcode(e){}
std::string errorstring;
int errorcode;
};
void MPI_err_handler(MPI_Comm*, int* err_code, ...)
{
std::vector<char> err_string(MPI_MAX_ERROR_STRING);
int err_length;
MPI_Error_string(*err_code, err_string.data(), &err_length);
std::string s(err_string.data(), err_length);
std::cerr << "An MPI Error ocurred:" << std::endl << s << std::endl;
throw MPIError(s, *err_code);
}
#endif
bool
init_unit_test_func()
{
return true;
}
bool operator==(const Opm::ConvergenceReport::WellFailure& wf1,
const Opm::ConvergenceReport::WellFailure& wf2)
{
return wf1.type() == wf2.type()
&& wf1.severity() == wf2.severity()
&& wf1.phase() == wf2.phase()
&& wf1.wellName() == wf2.wellName();
}
BOOST_AUTO_TEST_CASE(AllHaveFailure)
{
auto cc = Dune::MPIHelper::getCollectiveCommunication();
std::ostringstream name;
name << "WellRank" << cc.rank() << std::flush;
using CR = Opm::ConvergenceReport;
CR cr;
cr.setWellFailed({CR::WellFailure::Type::ControlBHP, CR::Severity::Normal, -1, name.str()});
CR global_cr = gatherConvergenceReport(cr);
BOOST_CHECK(global_cr.wellFailures().size() == std::size_t(cc.size()));
BOOST_CHECK(global_cr.wellFailures()[cc.rank()] == cr.wellFailures()[0]);
// Extra output for debugging.
if (cc.rank() == 0) {
for (const auto& wf : global_cr.wellFailures()) {
std::cout << "Well name of failure: " << wf.wellName() << std::endl;
}
}
}
BOOST_AUTO_TEST_CASE(EvenHaveFailure)
{
auto cc = Dune::MPIHelper::getCollectiveCommunication();
using CR = Opm::ConvergenceReport;
CR cr;
if (cc.rank() % 2 == 0) {
std::ostringstream name;
name << "WellRank" << cc.rank() << std::flush;
cr.setWellFailed({CR::WellFailure::Type::ControlBHP, CR::Severity::Normal, -1, name.str()});
}
CR global_cr = gatherConvergenceReport(cr);
BOOST_CHECK(global_cr.wellFailures().size() == std::size_t((cc.size())+1) / 2);
if (cc.rank() % 2 == 0) {
BOOST_CHECK(global_cr.wellFailures()[cc.rank()/2] == cr.wellFailures()[0]);
}
// Extra output for debugging.
if (cc.rank() == 0) {
for (const auto& wf : global_cr.wellFailures()) {
std::cout << "Well name of failure, should be only even: " << wf.wellName() << std::endl;
}
}
}
int main(int argc, char** argv)
{
Dune::MPIHelper::instance(argc, argv);
#if HAVE_MPI
// register a throwing error handler to allow for
// debugging with "catch throw" in gdb
MPI_Errhandler handler;
MPI_Comm_create_errhandler(MPI_err_handler, &handler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);
#endif
return boost::unit_test::unit_test_main(&init_unit_test_func, argc, argv);
}

View File

@ -57,6 +57,6 @@ int main(int argc, char** argv)
MPI_Comm_create_errhandler(MPI_err_handler, &handler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);
#endif
boost::unit_test::unit_test_main(&init_unit_test_func,
argc, argv);
return boost::unit_test::unit_test_main(&init_unit_test_func,
argc, argv);
}