Merge pull request #4103 from akva2/serializer_handle_vectors

mpiserializer: handle all vectors
This commit is contained in:
Arne Morten Kvarving
2022-09-13 14:44:08 +02:00
committed by GitHub
6 changed files with 101 additions and 226 deletions

View File

@@ -34,6 +34,8 @@
#include <variant>
#include <vector>
namespace Opm {
namespace detail
{
@@ -62,12 +64,10 @@ decltype(auto) make_variant(std::size_t index)
}
template<class T>
using remove_cvr_t = std::remove_const_t<std::remove_reference_t<T>>;
using remove_cvr_t = std::remove_cv_t<std::remove_reference_t<T>>;
} // namespace detail
namespace Opm {
/*! \brief Class for (de-)serializing and broadcasting data in parallel.
*! \details If the class has a serializeOp member this is used,
* if not it is passed on to the underlying primitive serializer.
@@ -95,12 +95,16 @@ public:
variant(data);
} else if constexpr (is_optional<T>::value) {
optional(data);
} else if constexpr (is_vector<T>::value) {
vector(const_cast<T&>(data));
} else if constexpr (is_map<T>::value) {
map(const_cast<T&>(data));
} else if constexpr (is_array<T>::value) {
array(const_cast<T&>(data));
} else if constexpr (is_set<T>::value) {
set(const_cast<T&>(data));
} else if constexpr (has_serializeOp<detail::remove_cvr_t<T>>::value) {
const_cast<T&>(data).serializeOp(*this);
} else {
if (m_op == Operation::PACKSIZE)
m_packSize += Mpi::packSize(data, m_comm);
@@ -124,6 +128,8 @@ public:
pair(it);
else if constexpr (is_ptr<T>::value)
ptr(it);
else if constexpr (is_vector<T>::value)
vector(it);
else if constexpr (has_serializeOp<T>::value)
it.serializeOp(*this);
else
@@ -145,6 +151,32 @@ public:
}
}
//! \brief Handler for bool vectors.
//! \param data The vector to (de-)serialize
void vector(std::vector<bool>& data)
{
if (m_op == Operation::PACKSIZE) {
m_packSize += Mpi::packSize(data.size(), m_comm);
m_packSize += data.size()*Mpi::packSize(bool(), m_comm);
} else if (m_op == Operation::PACK) {
(*this)(data.size());
for (const auto entry : data) { // Not a reference: vector<bool> range
bool b = entry;
(*this)(b);
}
} else if (m_op == Operation::UNPACK) {
size_t size;
(*this)(size);
data.clear();
data.reserve(size);
for (size_t i = 0; i < size; ++i) {
bool entry;
(*this)(entry);
data.push_back(entry);
}
}
}
template <class Array>
void array(Array& data)
{
@@ -212,30 +244,19 @@ public:
if (m_op == Operation::PACKSIZE) {
m_packSize += Mpi::packSize(data.has_value(), m_comm);
if (data.has_value()) {
if constexpr (has_serializeOp<T>::value) {
const_cast<T&>(*data).serializeOp(*this);
} else
m_packSize += Mpi::packSize(*data, m_comm);
(*this)(*data);
}
} else if (m_op == Operation::PACK) {
Mpi::pack(data.has_value(), m_buffer, m_position, m_comm);
if (data.has_value()) {
if constexpr (has_serializeOp<T>::value) {
const_cast<T&>(*data).serializeOp(*this);
} else {
Mpi::pack(*data, m_buffer, m_position, m_comm);
}
(*this)(*data);
}
} else if (m_op == Operation::UNPACK) {
bool has;
Mpi::unpack(has, m_buffer, m_position, m_comm);
if (has) {
T res;
if constexpr (has_serializeOp<T>::value) {
res.serializeOp(*this);
} else {
Mpi::unpack(res, m_buffer, m_position, m_comm);
}
(*this)(res);
const_cast<std::optional<T>&>(data) = res;
}
}
@@ -308,7 +329,7 @@ public:
auto handle = [&](auto& d)
{
if constexpr (is_vector<Data>::value)
this->vector(d);
vector(d);
else if constexpr (is_ptr<Data>::value)
ptr(d);
else if constexpr (has_serializeOp<Data>::value)
@@ -369,16 +390,52 @@ public:
//!
//! \tparam T Type of class to broadcast
//! \param data Class to broadcast
//! \param root Process to broadcast from
template<class T>
void broadcast(T& data)
void broadcast(T& data, int root = 0)
{
if (m_comm.size() == 1)
return;
if (m_comm.rank() == 0) {
if (m_comm.rank() == root) {
try {
pack(data);
m_packSize = m_position;
m_comm.broadcast(&m_packSize, 1, root);
m_comm.broadcast(m_buffer.data(), m_position, root);
} catch (...) {
m_packSize = std::numeric_limits<size_t>::max();
m_comm.broadcast(&m_packSize, 1, root);
throw;
}
} else {
m_comm.broadcast(&m_packSize, 1, root);
if (m_packSize == std::numeric_limits<size_t>::max()) {
throw std::runtime_error("Error detected in parallel serialization");
}
m_buffer.resize(m_packSize);
m_comm.broadcast(m_buffer.data(), m_packSize, root);
unpack(data);
}
}
template<typename... Args>
void broadcast(int root, Args&&... args)
{
if (m_comm.size() == 1)
return;
if (m_comm.rank() == root) {
try {
m_op = Operation::PACKSIZE;
m_packSize = 0;
variadic_call(args...);
m_position = 0;
m_buffer.resize(m_packSize);
m_op = Operation::PACK;
variadic_call(args...);
m_packSize = m_position;
m_comm.broadcast(&m_packSize, 1, 0);
m_comm.broadcast(m_buffer.data(), m_position, 0);
} catch (...) {
@@ -393,7 +450,9 @@ public:
}
m_buffer.resize(m_packSize);
m_comm.broadcast(m_buffer.data(), m_packSize, 0);
unpack(data);
m_position = 0;
m_op = Operation::UNPACK;
variadic_call(std::forward<Args>(args)...);
}
}
@@ -402,17 +461,18 @@ public:
//!
//! \tparam T Type of class to broadcast
//! \param data Class to broadcast
//! \param root Process to broadcast from
template<class T>
void append(T& data)
void append(T& data, int root = 0)
{
if (m_comm.size() == 1)
return;
T tmp;
T& bcast = m_comm.rank() == 0 ? data : tmp;
T& bcast = m_comm.rank() == root ? data : tmp;
broadcast(bcast);
if (m_comm.rank() != 0)
if (m_comm.rank() != root)
data.append(tmp);
}
@@ -429,6 +489,15 @@ public:
}
protected:
template<typename T, typename... Args>
void variadic_call(T& first,
Args&&... args)
{
(*this)(first);
if constexpr (sizeof...(args) > 0)
variadic_call(std::forward<Args>(args)...);
}
//! \brief Enumeration of operations.
enum class Operation {
PACKSIZE, //!< Calculating serialization buffer size

View File

@@ -67,28 +67,6 @@ std::size_t packSize(const std::pair<T1,T2>& data, Opm::Parallel::MPIComm comm)
return packSize(data.first, comm) + packSize(data.second, comm);
}
template<class T, class A>
std::size_t packSize(const std::vector<T,A>& data, Opm::Parallel::MPIComm comm)
{
if (std::is_pod<T>::value)
// size written automatically
return packSize(data.data(), data.size(), comm);
std::size_t size = packSize(data.size(), comm);
for (const auto& entry: data)
size += packSize(entry, comm);
return size;
}
template<class A>
std::size_t packSize(const std::vector<bool,A>& data, Opm::Parallel::MPIComm comm)
{
bool entry = false;
return packSize(data.size(), comm) + data.size()*packSize(entry,comm);
}
template<std::size_t I = 0, typename Tuple>
typename std::enable_if<I == std::tuple_size<Tuple>::value, std::size_t>::type
pack_size_tuple_entry(const Tuple&, Opm::Parallel::MPIComm)
@@ -211,34 +189,6 @@ void pack(const std::pair<T1,T2>& data, std::vector<char>& buffer, int& position
pack(data.second, buffer, position, comm);
}
template<class T, class A>
void pack(const std::vector<T, A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm)
{
if (std::is_pod<T>::value)
{
// size written automatically
pack(data.data(), data.size(), buffer, position, comm);
return;
}
pack(data.size(), buffer, position, comm);
for (const auto& entry: data)
pack(entry, buffer, position, comm);
}
template<class A>
void pack(const std::vector<bool,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm)
{
pack(data.size(), buffer, position, comm);
for (const auto entry : data) { // Not a reference: vector<bool> range
bool b = entry;
pack(b, buffer, position, comm);
}
}
template<std::size_t I = 0, typename Tuple>
typename std::enable_if<I == std::tuple_size<Tuple>::value, void>::type
pack_tuple_entry(const Tuple&, std::vector<char>&, int&,
@@ -340,39 +290,6 @@ void unpack(std::pair<T1,T2>& data, std::vector<char>& buffer, int& position,
unpack(data.second, buffer, position, comm);
}
template<class T, class A>
void unpack(std::vector<T,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm)
{
std::size_t length = 0;
unpack(length, buffer, position, comm);
data.resize(length);
if (std::is_pod<T>::value)
{
unpack(data.data(), data.size(), buffer, position, comm);
return;
}
for (auto& entry: data)
unpack(entry, buffer, position, comm);
}
template<class A>
void unpack(std::vector<bool,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm)
{
size_t size;
unpack(size, buffer, position, comm);
data.clear();
data.reserve(size);
for (size_t i = 0; i < size; ++i) {
bool entry;
unpack(entry, buffer, position, comm);
data.push_back(entry);
}
}
template<std::size_t I = 0, typename Tuple>
typename std::enable_if<I == std::tuple_size<Tuple>::value, void>::type
unpack_tuple_entry(Tuple&, std::vector<char>&, int&,
@@ -438,38 +355,6 @@ void unpack([[maybe_unused]] Opm::time_point& data, std::vector<char>& buffer, i
#endif
}
#define INSTANTIATE_PACK_VECTOR(...) \
template std::size_t packSize(const std::vector<__VA_ARGS__>& data, \
Opm::Parallel::MPIComm comm); \
template void pack(const std::vector<__VA_ARGS__>& data, \
std::vector<char>& buffer, int& position, \
Opm::Parallel::MPIComm comm); \
template void unpack(std::vector<__VA_ARGS__>& data, \
std::vector<char>& buffer, int& position, \
Opm::Parallel::MPIComm comm);
INSTANTIATE_PACK_VECTOR(float)
INSTANTIATE_PACK_VECTOR(double)
INSTANTIATE_PACK_VECTOR(std::vector<double>)
INSTANTIATE_PACK_VECTOR(bool)
INSTANTIATE_PACK_VECTOR(char)
INSTANTIATE_PACK_VECTOR(int)
INSTANTIATE_PACK_VECTOR(unsigned char)
INSTANTIATE_PACK_VECTOR(unsigned int)
INSTANTIATE_PACK_VECTOR(unsigned long int)
INSTANTIATE_PACK_VECTOR(unsigned long long int)
INSTANTIATE_PACK_VECTOR(std::time_t)
INSTANTIATE_PACK_VECTOR(std::pair<bool,double>)
INSTANTIATE_PACK_VECTOR(std::pair<std::string,std::vector<size_t>>)
INSTANTIATE_PACK_VECTOR(std::pair<int,std::vector<int>>)
INSTANTIATE_PACK_VECTOR(std::pair<int,std::vector<size_t>>)
INSTANTIATE_PACK_VECTOR(std::string)
#undef INSTANTIATE_PACK_VECTOR
#undef INSTANTIATE_PACK_SET
#define INSTANTIATE_PACK(...) \
template std::size_t packSize(const __VA_ARGS__& data, \
Opm::Parallel::MPIComm comm); \

View File

@@ -29,7 +29,6 @@
#include <string>
#include <tuple>
#include <typeinfo>
#include <vector>
namespace Opm
{
@@ -78,12 +77,6 @@ std::size_t packSize(const T& data, Opm::Parallel::MPIComm comm)
template<class T1, class T2>
std::size_t packSize(const std::pair<T1,T2>& data, Opm::Parallel::MPIComm comm);
template<class T, class A>
std::size_t packSize(const std::vector<T,A>& data, Opm::Parallel::MPIComm comm);
template<class A>
std::size_t packSize(const std::vector<bool,A>& data, Opm::Parallel::MPIComm comm);
template<class... Ts>
std::size_t packSize(const std::tuple<Ts...>& data, Opm::Parallel::MPIComm comm);
@@ -139,14 +132,6 @@ template<class T1, class T2>
void pack(const std::pair<T1,T2>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm);
template<class T, class A>
void pack(const std::vector<T,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm);
template<class A>
void pack(const std::vector<bool,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm);
template<class... Ts>
void pack(const std::tuple<Ts...>& data, std::vector<char>& buffer,
int& position, Opm::Parallel::MPIComm comm);
@@ -206,14 +191,6 @@ template<class T1, class T2>
void unpack(std::pair<T1,T2>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm);
template<class T, class A>
void unpack(std::vector<T,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm);
template<class A>
void unpack(std::vector<bool,A>& data, std::vector<char>& buffer, int& position,
Opm::Parallel::MPIComm comm);
template<class... Ts>
void unpack(std::tuple<Ts...>& data, std::vector<char>& buffer,
int& position, Opm::Parallel::MPIComm comm);
@@ -237,59 +214,6 @@ void unpack(std::bitset<Size>& data, std::vector<char>& buffer, int& position,
ADD_PACK_PROTOTYPES(std::string)
ADD_PACK_PROTOTYPES(time_point)
template<typename T, typename... Args>
void variadic_packsize(size_t& size, Parallel::Communication comm, T& first, Args&&... args)
{
size += packSize(first, comm);
if constexpr (sizeof...(args) > 0)
variadic_packsize(size, comm, std::forward<Args>(args)...);
}
template<typename T, typename... Args>
void variadic_pack(int& pos, std::vector<char>& buffer, Parallel::Communication comm, T& first, Args&&... args)
{
pack(first, buffer, pos, comm);
if constexpr (sizeof...(args) > 0)
variadic_pack(pos, buffer, comm, std::forward<Args>(args)...);
}
template<typename T, typename... Args>
void variadic_unpack(int& pos, std::vector<char>& buffer, Parallel::Communication comm, T& first, Args&&... args)
{
unpack(first, buffer, pos, comm);
if constexpr (sizeof...(args) > 0)
variadic_unpack(pos, buffer, comm, std::forward<Args>(args)...);
}
#if HAVE_MPI
template<typename... Args>
void broadcast(Parallel::Communication comm, int root, Args&&... args)
{
if (comm.size() == 1)
return;
size_t size = 0;
if (comm.rank() == root)
variadic_packsize(size, comm, args...);
comm.broadcast(&size, 1, root);
std::vector<char> buffer(size);
if (comm.rank() == root) {
int pos = 0;
variadic_pack(pos, buffer, comm, args...);
}
comm.broadcast(buffer.data(), size, root);
if (comm.rank() != root) {
int pos = 0;
variadic_unpack(pos, buffer, comm, std::forward<Args>(args)...);
}
}
#else
template<typename... Args>
void broadcast(Parallel::Communication, int, Args&&...)
{}
#endif
} // end namespace Mpi
} // end namespace Opm

View File

@@ -42,7 +42,6 @@
namespace Opm {
void eclStateBroadcast(Parallel::Communication comm, EclipseState& eclState, Schedule& schedule,
SummaryConfig& summaryConfig,
UDQState& udqState,
@@ -50,12 +49,7 @@ void eclStateBroadcast(Parallel::Communication comm, EclipseState& eclState, Sch
WellTestState& wtestState)
{
Opm::EclMpiSerializer ser(comm);
ser.broadcast(eclState);
ser.broadcast(schedule);
ser.broadcast(summaryConfig);
ser.broadcast(udqState);
ser.broadcast(actionState);
ser.broadcast(wtestState);
ser.broadcast(0, eclState, schedule, summaryConfig, udqState, actionState, wtestState);
}
template <class T>
@@ -65,7 +59,6 @@ void eclBroadcast(Parallel::Communication comm, T& data)
ser.broadcast(data);
}
template void eclBroadcast<TransMult>(Parallel::Communication, TransMult&);
template void eclBroadcast<Schedule>(Parallel::Communication, Schedule&);

View File

@@ -27,6 +27,7 @@
#include <opm/simulators/wells/VFPProperties.hpp>
#include <opm/simulators/utils/MPIPacker.hpp>
#include <ebos/eclmpiserializer.hh>
#include <algorithm>
#include <utility>
@@ -1058,8 +1059,9 @@ namespace Opm {
// data if they are going to check the group rates in stage1
// Another similar idea is to only communicate the rates to
// process j = i + 1
Mpi::broadcast(comm, i, group_indexes, group_oil_rates,
group_gas_rates, group_water_rates, group_alq_rates);
EclMpiSerializer ser(comm);
ser.broadcast(i, group_indexes, group_oil_rates,
group_gas_rates, group_water_rates, group_alq_rates);
if (comm.rank() != i) {
for (int j=0; j<num_rates_to_sync; j++) {
group_info.updateRate(group_indexes[j],

View File

@@ -26,6 +26,7 @@
#include <boost/test/unit_test.hpp>
#include <opm/simulators/utils/MPIPacker.hpp>
#include <ebos/eclmpiserializer.hh>
#include <dune/common/parallel/mpihelper.hh>
#include <numeric>
@@ -70,7 +71,8 @@ BOOST_AUTO_TEST_CASE(BroadCast)
double d1 = cc.rank() == 0 ? 7.0 : 0.0;
size_t i1 = cc.rank() == 0 ? 8 : 0;
Opm::Mpi::broadcast(cc, 0, d, i, d1, i1);
Opm::EclMpiSerializer ser(cc);
ser.broadcast(0, d, i, d1, i1);
for (size_t c = 0; c < 3; ++c) {
BOOST_CHECK_EQUAL(d[c], 1.0+c);