mirror of
https://github.com/OPM/opm-simulators.git
synced 2025-02-25 18:55:30 -06:00
Merge pull request #586 from atgeirr/gather-exception-messages-initialization
Add and use gatherStrings() for exception messages.
This commit is contained in:
commit
d2f7f5a672
211
opm/models/parallel/mpiutil.hh
Normal file
211
opm/models/parallel/mpiutil.hh
Normal file
@ -0,0 +1,211 @@
|
||||
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
|
||||
// vi: set et ts=4 sw=4 sts=4:
|
||||
/*
|
||||
This file is part of the Open Porous Media project (OPM).
|
||||
|
||||
OPM is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OPM is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OPM. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
Consult the COPYING file in the top-level source directory of this
|
||||
module for the precise wording of the license and the list of
|
||||
copyright holders.
|
||||
*/
|
||||
/*!
|
||||
* \file
|
||||
* \copydoc Opm::MpiBuffer
|
||||
*/
|
||||
#ifndef OPM_MATERIAL_MPIUTIL_HH
|
||||
#define OPM_MATERIAL_MPIUTIL_HH
|
||||
|
||||
#include <dune/common/parallel/mpitraits.hh>
|
||||
|
||||
#include <cassert>
|
||||
#include <numeric>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
|
||||
#if HAVE_MPI
|
||||
|
||||
#include <mpi.h>
|
||||
|
||||
|
||||
|
||||
namespace mpiutil_details
|
||||
{
|
||||
|
||||
template <typename T>
|
||||
int packSize()
|
||||
{
|
||||
int pack_size;
|
||||
MPI_Pack_size(1, Dune::MPITraits<T>::getType(), MPI_COMM_WORLD, &pack_size);
|
||||
return pack_size;
|
||||
}
|
||||
|
||||
// -------- Packer --------
|
||||
template <typename T>
|
||||
struct Packer
|
||||
{
|
||||
static int size(const T&)
|
||||
{
|
||||
return packSize<T>();
|
||||
}
|
||||
|
||||
static void pack(const T& content, std::vector<char>& buf, int& offset)
|
||||
{
|
||||
MPI_Pack(&content, 1, Dune::MPITraits<T>::getType(), buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
}
|
||||
|
||||
static T unpack(const std::vector<char>& recv_buffer, int& offset)
|
||||
{
|
||||
T content;
|
||||
auto* data = const_cast<char*>(recv_buffer.data());
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, &content, 1, Dune::MPITraits<T>::getType(), MPI_COMM_WORLD);
|
||||
return content;
|
||||
}
|
||||
};
|
||||
|
||||
// -------- Packer, string specialization --------
|
||||
template <>
|
||||
struct Packer<std::string>
|
||||
{
|
||||
static int size(const std::string& content)
|
||||
{
|
||||
return packSize<unsigned int>() + content.size()*packSize<char>();
|
||||
}
|
||||
|
||||
static void pack(const std::string& content, std::vector<char>& buf, int& offset)
|
||||
{
|
||||
unsigned int size = content.size();
|
||||
Packer<unsigned int>::pack(size, buf, offset);
|
||||
if (size > 0) {
|
||||
MPI_Pack(const_cast<char*>(content.c_str()), size, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
}
|
||||
}
|
||||
|
||||
static std::string unpack(const std::vector<char>& recv_buffer, int& offset)
|
||||
{
|
||||
unsigned int size = Packer<unsigned int>::unpack(recv_buffer, offset);
|
||||
std::string text;
|
||||
if (size > 0) {
|
||||
auto* data = const_cast<char*>(recv_buffer.data());
|
||||
std::vector<char> chars(size);
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, chars.data(), size, MPI_CHAR, MPI_COMM_WORLD);
|
||||
text = std::string(chars.data(), size);
|
||||
}
|
||||
return text;
|
||||
}
|
||||
};
|
||||
|
||||
// -------- Packer, vector partial specialization --------
|
||||
template <typename T>
|
||||
struct Packer<std::vector<T>>
|
||||
{
|
||||
static int size(const std::string& content)
|
||||
{
|
||||
int sz = 0;
|
||||
sz += packSize<unsigned int>();
|
||||
for (const T& elem : content) {
|
||||
sz += Packer<T>::size(elem);
|
||||
}
|
||||
return sz;
|
||||
}
|
||||
|
||||
static void pack(const std::vector<T>& content, std::vector<char>& buf, int& offset)
|
||||
{
|
||||
unsigned int size = content.size();
|
||||
Packer<unsigned int>::pack(size, buf, offset);
|
||||
for (const T& elem : content) {
|
||||
Packer<T>::pack(elem);
|
||||
}
|
||||
}
|
||||
|
||||
static std::vector<T> unpack(const std::vector<char>& recv_buffer, int& offset)
|
||||
{
|
||||
unsigned int size = Packer<T>::unpack(recv_buffer, offset);
|
||||
std::vector<T> content;
|
||||
content.reserve(size);
|
||||
for (unsigned int i = 0; i < size; ++i) {
|
||||
content.push_back(Packer<T>::unpack(recv_buffer, offset));
|
||||
}
|
||||
return content;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
|
||||
namespace Opm
|
||||
{
|
||||
|
||||
/// From each rank, gather its string (if not empty) into a vector.
|
||||
inline std::vector<std::string> gatherStrings(const std::string& local_string)
|
||||
{
|
||||
using StringPacker = mpiutil_details::Packer<std::string>;
|
||||
|
||||
// Pack local messages.
|
||||
const int message_size = StringPacker::size(local_string);
|
||||
std::vector<char> buffer(message_size);
|
||||
int offset = 0;
|
||||
StringPacker::pack(local_string, buffer, offset);
|
||||
assert(offset == message_size);
|
||||
|
||||
// Get message sizes and create offset/displacement array for gathering.
|
||||
int num_processes = -1;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
|
||||
std::vector<int> message_sizes(num_processes);
|
||||
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
|
||||
std::vector<int> displ(num_processes + 1, 0);
|
||||
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
|
||||
|
||||
// Gather.
|
||||
std::vector<char> recv_buffer(displ.back());
|
||||
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
|
||||
const_cast<char*>(recv_buffer.data()), message_sizes.data(),
|
||||
displ.data(), MPI_PACKED,
|
||||
MPI_COMM_WORLD);
|
||||
|
||||
// Unpack and return.
|
||||
std::vector<std::string> ret;
|
||||
for (int process = 0; process < num_processes; ++process) {
|
||||
offset = displ[process];
|
||||
std::string s = StringPacker::unpack(recv_buffer, offset);
|
||||
if (!s.empty()) {
|
||||
ret.push_back(s);
|
||||
}
|
||||
assert(offset == displ[process + 1]);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace Opm
|
||||
|
||||
#else // HAVE_MPI
|
||||
|
||||
namespace Opm
|
||||
{
|
||||
inline std::vector<std::string> gatherStrings(const std::string& local_string)
|
||||
{
|
||||
if (local_string.empty()) {
|
||||
return {};
|
||||
} else {
|
||||
return { local_string };
|
||||
}
|
||||
}
|
||||
} // namespace Opm
|
||||
|
||||
#endif // HAVE_MPI
|
||||
|
||||
#endif // OPM_MATERIAL_MPIUTIL_HH
|
||||
|
@ -34,6 +34,7 @@
|
||||
#include <opm/models/utils/propertysystem.hh>
|
||||
#include <opm/models/utils/timer.hh>
|
||||
#include <opm/models/utils/timerguard.hh>
|
||||
#include <opm/models/parallel/mpiutil.hh>
|
||||
|
||||
#include <dune/common/version.hh>
|
||||
#include <dune/common/parallel/mpihelper.hh>
|
||||
@ -147,16 +148,24 @@ public:
|
||||
std::cout << "Allocating the simulation vanguard\n" << std::flush;
|
||||
|
||||
int exceptionThrown = 0;
|
||||
std::string what;
|
||||
try
|
||||
{ vanguard_.reset(new Vanguard(*this)); }
|
||||
catch (const std::exception& e) {
|
||||
exceptionThrown = 1;
|
||||
what = e.what();
|
||||
if (comm.size() > 1) {
|
||||
what += " (on rank " + std::to_string(comm.rank()) + ")";
|
||||
}
|
||||
if (verbose_)
|
||||
std::cerr << "Rank " << comm.rank() << " threw an exception: " << e.what() << std::endl;
|
||||
}
|
||||
|
||||
if (comm.max(exceptionThrown))
|
||||
throw std::runtime_error("Allocating the simulation vanguard failed.");
|
||||
if (comm.max(exceptionThrown)) {
|
||||
auto all_what = gatherStrings(what);
|
||||
assert(!all_what.empty());
|
||||
throw std::runtime_error("Allocating the simulation vanguard failed: " + all_what.front());
|
||||
}
|
||||
|
||||
if (verbose_)
|
||||
std::cout << "Distributing the vanguard's data\n" << std::flush;
|
||||
@ -165,12 +174,19 @@ public:
|
||||
{ vanguard_->loadBalance(); }
|
||||
catch (const std::exception& e) {
|
||||
exceptionThrown = 1;
|
||||
what = e.what();
|
||||
if (comm.size() > 1) {
|
||||
what += " (on rank " + std::to_string(comm.rank()) + ")";
|
||||
}
|
||||
if (verbose_)
|
||||
std::cerr << "Rank " << comm.rank() << " threw an exception: " << e.what() << std::endl;
|
||||
}
|
||||
|
||||
if (comm.max(exceptionThrown))
|
||||
throw std::runtime_error("Could not distribute the vanguard data.");
|
||||
if (comm.max(exceptionThrown)) {
|
||||
auto all_what = gatherStrings(what);
|
||||
assert(!all_what.empty());
|
||||
throw std::runtime_error("Could not distribute the vanguard data: " + all_what.front());
|
||||
}
|
||||
|
||||
if (verbose_)
|
||||
std::cout << "Allocating the model\n" << std::flush;
|
||||
@ -187,12 +203,19 @@ public:
|
||||
{ model_->finishInit(); }
|
||||
catch (const std::exception& e) {
|
||||
exceptionThrown = 1;
|
||||
what = e.what();
|
||||
if (comm.size() > 1) {
|
||||
what += " (on rank " + std::to_string(comm.rank()) + ")";
|
||||
}
|
||||
if (verbose_)
|
||||
std::cerr << "Rank " << comm.rank() << " threw an exception: " << e.what() << std::endl;
|
||||
}
|
||||
|
||||
if (comm.max(exceptionThrown))
|
||||
throw std::runtime_error("Could not initialize the model.");
|
||||
if (comm.max(exceptionThrown)) {
|
||||
auto all_what = gatherStrings(what);
|
||||
assert(!all_what.empty());
|
||||
throw std::runtime_error("Could not initialize the model: " + all_what.front());
|
||||
}
|
||||
|
||||
if (verbose_)
|
||||
std::cout << "Initializing the problem\n" << std::flush;
|
||||
@ -201,12 +224,19 @@ public:
|
||||
{ problem_->finishInit(); }
|
||||
catch (const std::exception& e) {
|
||||
exceptionThrown = 1;
|
||||
what = e.what();
|
||||
if (comm.size() > 1) {
|
||||
what += " (on rank " + std::to_string(comm.rank()) + ")";
|
||||
}
|
||||
if (verbose_)
|
||||
std::cerr << "Rank " << comm.rank() << " threw an exception: " << e.what() << std::endl;
|
||||
}
|
||||
|
||||
if (comm.max(exceptionThrown))
|
||||
throw std::runtime_error("Could not initialize the problem.");
|
||||
if (comm.max(exceptionThrown)) {
|
||||
auto all_what = gatherStrings(what);
|
||||
assert(!all_what.empty());
|
||||
throw std::runtime_error("Could not initialize the problem: " + all_what.front());
|
||||
}
|
||||
|
||||
setupTimer_.stop();
|
||||
|
||||
|
103
tests/models/test_mpiutil.cpp
Normal file
103
tests/models/test_mpiutil.cpp
Normal file
@ -0,0 +1,103 @@
|
||||
/*
|
||||
Copyright 2020 Equinor ASA.
|
||||
|
||||
This file is part of the Open Porous Media project (OPM).
|
||||
|
||||
OPM is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OPM is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OPM. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <config.h>
|
||||
|
||||
#include <opm/models/parallel/mpiutil.hh>
|
||||
#include <dune/common/parallel/mpihelper.hh>
|
||||
|
||||
#include <cassert>
|
||||
|
||||
#if HAVE_MPI
|
||||
struct MPIError
|
||||
{
|
||||
MPIError(std::string s, int e) : errorstring(std::move(s)), errorcode(e){}
|
||||
std::string errorstring;
|
||||
int errorcode;
|
||||
};
|
||||
|
||||
void MPI_err_handler(MPI_Comm*, int* err_code, ...)
|
||||
{
|
||||
std::vector<char> err_string(MPI_MAX_ERROR_STRING);
|
||||
int err_length;
|
||||
MPI_Error_string(*err_code, err_string.data(), &err_length);
|
||||
std::string s(err_string.data(), err_length);
|
||||
std::cerr << "An MPI Error ocurred:" << std::endl << s << std::endl;
|
||||
throw MPIError(s, *err_code);
|
||||
}
|
||||
#endif
|
||||
|
||||
bool noStrings(int, int)
|
||||
{
|
||||
std::string empty;
|
||||
auto res = Opm::gatherStrings(empty);
|
||||
assert(res.empty());
|
||||
return true;
|
||||
}
|
||||
|
||||
bool oddRankStrings(int size, int rank)
|
||||
{
|
||||
std::string what = (rank % 2 == 1) ? "An error on rank " + std::to_string(rank) : std::string();
|
||||
auto res = Opm::gatherStrings(what);
|
||||
assert(int(res.size()) == size/2);
|
||||
for (int i = 0; i < size/2; ++i) {
|
||||
assert(res[i] == "An error on rank " + std::to_string(2*i + 1));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool allRankStrings(int size, int rank)
|
||||
{
|
||||
std::string what = "An error on rank " + std::to_string(rank);
|
||||
auto res = Opm::gatherStrings(what);
|
||||
assert(int(res.size()) == size);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
assert(res[i] == "An error on rank " + std::to_string(i));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
int testMain(int size, int rank)
|
||||
{
|
||||
bool ok = noStrings(size, rank);
|
||||
ok = ok && oddRankStrings(size, rank);
|
||||
ok = ok && allRankStrings(size, rank);
|
||||
if (ok) {
|
||||
return EXIT_SUCCESS;
|
||||
} else {
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
const auto& mpiHelper = Dune::MPIHelper::instance(argc, argv);
|
||||
int mpiSize = mpiHelper.size();
|
||||
int mpiRank = mpiHelper.rank();
|
||||
#if HAVE_MPI
|
||||
// register a throwing error handler to allow for
|
||||
// debugging with "catch throw" in gdb
|
||||
MPI_Errhandler handler;
|
||||
MPI_Comm_create_errhandler(MPI_err_handler, &handler);
|
||||
MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);
|
||||
#endif
|
||||
return testMain(mpiSize, mpiRank);
|
||||
}
|
@ -109,6 +109,21 @@ case "$TEST_TYPE" in
|
||||
exit 0
|
||||
;;
|
||||
|
||||
"--parallel-program="*)
|
||||
NUM_PROCS="${TEST_TYPE/--parallel-program=/}"
|
||||
|
||||
echo "executing \"mpirun -np \"$NUM_PROCS\" $TEST_BINARY $TEST_ARGS\""
|
||||
mpirun -np "$NUM_PROCS" "$TEST_BINARY" $TEST_ARGS | tee "test-$RND.log"
|
||||
RET="${PIPESTATUS[0]}"
|
||||
if test "$RET" != "0"; then
|
||||
echo "Executing the binary failed!"
|
||||
rm "test-$RND.log"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
exit 0
|
||||
;;
|
||||
|
||||
"--parallel-simulation="*)
|
||||
NUM_PROCS="${TEST_TYPE/--parallel-simulation=/}"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user