added: utility to broadcast multiple variables in one operation

This commit is contained in:
Arne Morten Kvarving 2021-11-09 11:48:06 +01:00
parent ad44564f27
commit 2120cbf043
3 changed files with 153 additions and 0 deletions

View File

@ -342,6 +342,18 @@ opm_add_test(test_parallelwellinfo_mpi
NO_COMPILE
)
opm_add_test(test_broadcast
DEPENDS "opmsimulators"
LIBRARIES opmsimulators ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}
SOURCES
tests/test_broadcast.cpp
CONDITION
MPI_FOUND AND Boost_UNIT_TEST_FRAMEWORK_FOUND
DRIVER_ARGS
-n 4
-b ${PROJECT_BINARY_DIR}
)
include(OpmBashCompletion)
if (NOT BUILD_FLOW)

View File

@ -361,6 +361,53 @@ ADD_PACK_PROTOTYPES(RestartValue)
ADD_PACK_PROTOTYPES(std::string)
ADD_PACK_PROTOTYPES(time_point)
template<typename T, typename... Args>
void variadic_packsize(size_t& size, Parallel::Communication comm, T& first, Args&&... args)
{
size += packSize(first, comm);
if constexpr (sizeof...(args) > 0)
variadic_packsize(size, comm, std::forward<Args>(args)...);
}
template<typename T, typename... Args>
void variadic_pack(int& pos, std::vector<char>& buffer, Parallel::Communication comm, T& first, Args&&... args)
{
pack(first, buffer, pos, comm);
if constexpr (sizeof...(args) > 0)
variadic_pack(pos, buffer, comm, std::forward<Args>(args)...);
}
template<typename T, typename... Args>
void variadic_unpack(int& pos, std::vector<char>& buffer, Parallel::Communication comm, T& first, Args&&... args)
{
unpack(first, buffer, pos, comm);
if constexpr (sizeof...(args) > 0)
variadic_unpack(pos, buffer, comm, std::forward<Args>(args)...);
}
template<typename... Args>
void broadcast(Parallel::Communication comm, int root, Args&&... args)
{
if (comm.size() == 1)
return;
size_t size = 0;
if (comm.rank() == root)
variadic_packsize(size, comm, std::forward<Args>(args)...);
comm.broadcast(&size, 1, root);
std::vector<char> buffer(size);
if (comm.rank() == root) {
int pos = 0;
variadic_pack(pos, buffer, comm, std::forward<Args>(args)...);
}
comm.broadcast(buffer.data(), size, root);
if (comm.rank() != root) {
int pos = 0;
variadic_unpack(pos, buffer, comm, std::forward<Args>(args)...);
}
}
} // end namespace Mpi
RestartValue loadParallelRestart(const EclipseIO* eclIO, Action::State& actionState, SummaryState& summaryState,

94
tests/test_broadcast.cpp Normal file
View File

@ -0,0 +1,94 @@
/*
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2018 Equinor.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#define BOOST_TEST_MODULE TestBroadCast
#define BOOST_TEST_NO_MAIN
#include <boost/test/unit_test.hpp>
#include <opm/simulators/utils/ParallelRestart.hpp>
#include <dune/common/parallel/mpihelper.hh>
#include <numeric>
#if HAVE_MPI
struct MPIError
{
MPIError(std::string s, int e) : errorstring(std::move(s)), errorcode(e){}
std::string errorstring;
int errorcode;
};
void MPI_err_handler(MPI_Comm*, int* err_code, ...)
{
std::vector<char> err_string(MPI_MAX_ERROR_STRING);
int err_length;
MPI_Error_string(*err_code, err_string.data(), &err_length);
std::string s(err_string.data(), err_length);
std::cerr << "An MPI Error ocurred:" << std::endl << s << std::endl;
throw MPIError(s, *err_code);
}
#endif
bool
init_unit_test_func()
{
return true;
}
BOOST_AUTO_TEST_CASE(BroadCast)
{
auto cc = Dune::MPIHelper::getCollectiveCommunication();
std::vector<double> d(3);
if (cc.rank() == 0)
std::iota(d.begin(), d.end(), 1.0);
std::vector<int> i(3);
if (cc.rank() == 0)
std::iota(i.begin(), i.end(), 4);
double d1 = cc.rank() == 0 ? 7.0 : 0.0;
size_t i1 = cc.rank() == 0 ? 8 : 0;
Opm::Mpi::broadcast(cc, 0, d, i, d1, i1);
for (size_t c = 0; c < 3; ++c) {
BOOST_CHECK_EQUAL(d[c], 1.0+c);
BOOST_CHECK_EQUAL(i[c], 4+c);
}
BOOST_CHECK_EQUAL(d1, 7.0);
BOOST_CHECK_EQUAL(i1, 8);
}
int main(int argc, char** argv)
{
Dune::MPIHelper::instance(argc, argv);
#if HAVE_MPI
// register a throwing error handler to allow for
// debugging with "catch throw" in gdb
MPI_Errhandler handler;
MPI_Comm_create_errhandler(MPI_err_handler, &handler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);
#endif
return boost::unit_test::unit_test_main(&init_unit_test_func, argc, argv);
}