diff --git a/ebos/eclmpiserializer.hh b/ebos/eclmpiserializer.hh index 970394c2a..34e6e3ea6 100644 --- a/ebos/eclmpiserializer.hh +++ b/ebos/eclmpiserializer.hh @@ -68,17 +68,18 @@ using remove_cvr_t = std::remove_cv_t>; } // namespace detail -/*! \brief Class for (de-)serializing and broadcasting data in parallel. +/*! \brief Class for (de-)serializing. *! \details If the class has a serializeOp member this is used, - * if not it is passed on to the underlying primitive serializer. + * if not it is passed on to the underlying packer. */ -class EclMpiSerializer { +template +class Serializer { public: //! \brief Constructor. - //! \param comm The global communicator to broadcast using - explicit EclMpiSerializer(Opm::Parallel::Communication comm) : - m_comm(comm) + //! \param packer Packer to use + explicit Serializer(const Packer& packer) : + m_packer(packer) {} //! \brief Applies current serialization op to the passed data. @@ -105,11 +106,11 @@ public: const_cast(data).serializeOp(*this); } else { if (m_op == Operation::PACKSIZE) - m_packSize += Mpi::Packer::packSize(data, m_comm); + m_packSize += m_packer.packSize(data); else if (m_op == Operation::PACK) - Mpi::Packer::pack(data, m_buffer, m_position, m_comm); + m_packer.pack(data, m_buffer, m_position); else if (m_op == Operation::UNPACK) - Mpi::Packer::unpack(const_cast(data), m_buffer, m_position, m_comm); + m_packer.unpack(const_cast(data), m_buffer, m_position); } } @@ -128,6 +129,21 @@ public: (*this)(data); } + //! \brief Call this to serialize data. + //! \tparam T Type of class to serialize + //! \param data Class to serialize + template + void pack(const Args&... data) + { + m_op = Operation::PACKSIZE; + m_packSize = 0; + variadic_call(data...); + m_position = 0; + m_buffer.resize(m_packSize); + m_op = Operation::PACK; + variadic_call(data...); + } + //! \brief Call this to de-serialize data. //! \tparam T Type of class to de-serialize //! \param data Class to de-serialize @@ -139,95 +155,15 @@ public: (*this)(data); } - //! \brief Serialize and broadcast on root process, de-serialize on - //! others. - //! - //! \tparam T Type of class to broadcast - //! \param data Class to broadcast - //! \param root Process to broadcast from - template - void broadcast(T& data, int root = 0) + //! \brief Call this to de-serialize data. + //! \tparam T Type of class to de-serialize + //! \param data Class to de-serialize + template + void unpack(Args&... data) { - if (m_comm.size() == 1) - return; - - if (m_comm.rank() == root) { - try { - pack(data); - m_packSize = m_position; - m_comm.broadcast(&m_packSize, 1, root); - m_comm.broadcast(m_buffer.data(), m_position, root); - } catch (...) { - m_packSize = std::numeric_limits::max(); - m_comm.broadcast(&m_packSize, 1, root); - throw; - } - } else { - m_comm.broadcast(&m_packSize, 1, root); - if (m_packSize == std::numeric_limits::max()) { - throw std::runtime_error("Error detected in parallel serialization"); - } - - m_buffer.resize(m_packSize); - m_comm.broadcast(m_buffer.data(), m_packSize, root); - unpack(data); - } - } - - template - void broadcast(int root, Args&&... args) - { - if (m_comm.size() == 1) - return; - - if (m_comm.rank() == root) { - try { - m_op = Operation::PACKSIZE; - m_packSize = 0; - variadic_call(args...); - m_position = 0; - m_buffer.resize(m_packSize); - m_op = Operation::PACK; - variadic_call(args...); - m_packSize = m_position; - m_comm.broadcast(&m_packSize, 1, root); - m_comm.broadcast(m_buffer.data(), m_position, root); - } catch (...) { - m_packSize = std::numeric_limits::max(); - m_comm.broadcast(&m_packSize, 1, root); - throw; - } - } else { - m_comm.broadcast(&m_packSize, 1, root); - if (m_packSize == std::numeric_limits::max()) { - throw std::runtime_error("Error detected in parallel serialization"); - } - m_buffer.resize(m_packSize); - m_comm.broadcast(m_buffer.data(), m_packSize, root); - m_position = 0; - m_op = Operation::UNPACK; - variadic_call(std::forward(args)...); - } - } - - //! \brief Serialize and broadcast on root process, de-serialize and append on - //! others. - //! - //! \tparam T Type of class to broadcast - //! \param data Class to broadcast - //! \param root Process to broadcast from - template - void append(T& data, int root = 0) - { - if (m_comm.size() == 1) - return; - - T tmp; - T& bcast = m_comm.rank() == root ? data : tmp; - broadcast(bcast); - - if (m_comm.rank() != root) - data.append(tmp); + m_position = 0; + m_op = Operation::UNPACK; + variadic_call(data...); } //! \brief Returns current position in buffer. @@ -252,16 +188,16 @@ protected: if constexpr (std::is_pod_v) { if (m_op == Operation::PACKSIZE) { (*this)(data.size()); - m_packSize += Mpi::Packer::packSize(data.data(), data.size(), m_comm); + m_packSize += m_packer.packSize(data.data(), data.size()); } else if (m_op == Operation::PACK) { (*this)(data.size()); - Mpi::Packer::pack(data.data(), data.size(), m_buffer, m_position, m_comm); + m_packer.pack(data.data(), data.size(), m_buffer, m_position); } else if (m_op == Operation::UNPACK) { std::size_t size = 0; (*this)(size); auto& data_mut = const_cast&>(data); data_mut.resize(size); - Mpi::Packer::unpack(data_mut.data(), size, m_buffer, m_position, m_comm); + m_packer.unpack(data_mut.data(), size, m_buffer, m_position); } } else { if (m_op == Operation::UNPACK) { @@ -310,12 +246,12 @@ protected: if constexpr (std::is_pod_v) { if (m_op == Operation::PACKSIZE) - m_packSize += Mpi::Packer::packSize(data.data(), data.size(), m_comm); + m_packSize += m_packer.packSize(data.data(), data.size()); else if (m_op == Operation::PACK) - Mpi::Packer::pack(data.data(), data.size(), m_buffer, m_position, m_comm); + m_packer.pack(data.data(), data.size(), m_buffer, m_position); else if (m_op == Operation::UNPACK) { auto& data_mut = const_cast(data); - Mpi::Packer::unpack(data_mut.data(), data_mut.size(), m_buffer, m_position, m_comm); + m_packer.unpack(data_mut.data(), data_mut.size(), m_buffer, m_position); } } else { std::for_each(data.begin(), data.end(), std::ref(*this)); @@ -561,7 +497,7 @@ protected: //! function) template struct has_serializeOp< - T, std::void_t().serializeOp(std::declval()))> + T, std::void_t().serializeOp(std::declval&>()))> > : public std::true_type {}; //! \brief Handler for smart pointers. @@ -579,14 +515,107 @@ protected: } } - Parallel::Communication m_comm; //!< Communicator to broadcast using - + const Packer& m_packer; //!< Packer to use Operation m_op = Operation::PACKSIZE; //!< Current operation size_t m_packSize = 0; //!< Required buffer size after PACKSIZE has been done int m_position = 0; //!< Current position in buffer std::vector m_buffer; //!< Buffer for serialized data }; +class EclMpiSerializer : public Serializer { +public: + EclMpiSerializer(Parallel::Communication comm) + : Serializer(m_packer) + , m_packer(comm) + , m_comm(comm) + {} + + //! \brief Serialize and broadcast on root process, de-serialize on + //! others. + //! + //! \tparam T Type of class to broadcast + //! \param data Class to broadcast + //! \param root Process to broadcast from + template + void broadcast(T& data, int root = 0) + { + if (m_comm.size() == 1) + return; + + if (m_comm.rank() == root) { + try { + this->pack(data); + m_comm.broadcast(&m_packSize, 1, root); + m_comm.broadcast(m_buffer.data(), m_packSize, root); + } catch (...) { + m_packSize = std::numeric_limits::max(); + m_comm.broadcast(&m_packSize, 1, root); + throw; + } + } else { + m_comm.broadcast(&m_packSize, 1, root); + if (m_packSize == std::numeric_limits::max()) { + throw std::runtime_error("Error detected in parallel serialization"); + } + + m_buffer.resize(m_packSize); + m_comm.broadcast(m_buffer.data(), m_packSize, root); + this->unpack(data); + } + } + + template + void broadcast(int root, Args&&... args) + { + if (m_comm.size() == 1) + return; + + if (m_comm.rank() == root) { + try { + this->pack(std::forward(args)...); + m_comm.broadcast(&m_packSize, 1, root); + m_comm.broadcast(m_buffer.data(), m_packSize, root); + } catch (...) { + m_packSize = std::numeric_limits::max(); + m_comm.broadcast(&m_packSize, 1, root); + throw; + } + } else { + m_comm.broadcast(&m_packSize, 1, root); + if (m_packSize == std::numeric_limits::max()) { + throw std::runtime_error("Error detected in parallel serialization"); + } + m_buffer.resize(m_packSize); + m_comm.broadcast(m_buffer.data(), m_packSize, root); + this->unpack(std::forward(args)...); + } + } + + //! \brief Serialize and broadcast on root process, de-serialize and append on + //! others. + //! + //! \tparam T Type of class to broadcast + //! \param data Class to broadcast + //! \param root Process to broadcast from + template + void append(T& data, int root = 0) + { + if (m_comm.size() == 1) + return; + + T tmp; + T& bcast = m_comm.rank() == root ? data : tmp; + broadcast(bcast, root); + + if (m_comm.rank() != root) + data.append(tmp); + } + +private: + const Mpi::Packer m_packer; //!< Packer instance + Parallel::Communication m_comm; //!< Communicator to use +}; + } #endif diff --git a/opm/simulators/utils/MPIPacker.hpp b/opm/simulators/utils/MPIPacker.hpp index 02976654e..9d5173e8c 100644 --- a/opm/simulators/utils/MPIPacker.hpp +++ b/opm/simulators/utils/MPIPacker.hpp @@ -16,8 +16,8 @@ You should have received a copy of the GNU General Public License along with OPM. If not, see . */ -#ifndef MPI_SERIALIZER_HPP -#define MPI_SERIALIZER_HPP +#ifndef MPI_PACKER_HPP +#define MPI_PACKER_HPP #include #include @@ -167,30 +167,36 @@ struct Packing> ADD_PACK_SPECIALIZATION(std::string) ADD_PACK_SPECIALIZATION(time_point) +#undef ADD_PACK_SPECIALIZATION + } //! \brief Struct handling packing of serialization for MPI communication. struct Packer { + //! \brief Constructor. + //! \param comm The communicator to use + Packer(Parallel::Communication comm) + : m_comm(comm) + {} + //! \brief Calculates the pack size for a variable. //! \tparam T The type of the data to be packed //! \param data The data to pack - //! \param comm The communicator to use template - static std::size_t packSize(const T& data, Parallel::MPIComm comm) + std::size_t packSize(const T& data) const { - return detail::Packing,T>::packSize(data,comm); + return detail::Packing,T>::packSize(data, m_comm); } //! \brief Calculates the pack size for an array. //! \tparam T The type of the data to be packed //! \param data The array to pack //! \param n Length of array - //! \param comm The communicator to use template - static std::size_t packSize(const T* data, std::size_t n, Parallel::MPIComm comm) + std::size_t packSize(const T* data, std::size_t n) const { static_assert(std::is_pod_v, "Array packing not supported for non-pod data"); - return detail::Packing::packSize(data,n,comm); + return detail::Packing::packSize(data, n, m_comm); } //! \brief Pack a variable. @@ -198,14 +204,12 @@ struct Packer { //! \param data The variable to pack //! \param buffer Buffer to pack into //! \param position Position in buffer to use - //! \param comm The communicator to use template - static void pack(const T& data, - std::vector& buffer, - int& position, - Parallel::MPIComm comm) + void pack(const T& data, + std::vector& buffer, + int& position) const { - detail::Packing,T>::pack(data, buffer, position, comm); + detail::Packing,T>::pack(data, buffer, position, m_comm); } //! \brief Pack an array. @@ -214,16 +218,14 @@ struct Packer { //! \param n Length of array //! \param buffer Buffer to pack into //! \param position Position in buffer to use - //! \param comm The communicator to use template - static void pack(const T* data, - std::size_t n, - std::vector& buffer, - int& position, - Parallel::MPIComm comm) + void pack(const T* data, + std::size_t n, + std::vector& buffer, + int& position) const { static_assert(std::is_pod_v, "Array packing not supported for non-pod data"); - detail::Packing::pack(data, n, buffer, position, comm); + detail::Packing::pack(data, n, buffer, position, m_comm); } //! \brief Unpack a variable. @@ -231,14 +233,12 @@ struct Packer { //! \param data The variable to unpack //! \param buffer Buffer to unpack from //! \param position Position in buffer to use - //! \param comm The communicator to use template - static void unpack(T& data, - std::vector& buffer, - int& position, - Parallel::MPIComm comm) + void unpack(T& data, + std::vector& buffer, + int& position) const { - detail::Packing,T>::unpack(data, buffer, position, comm); + detail::Packing,T>::unpack(data, buffer, position, m_comm); } //! \brief Unpack an array. @@ -247,20 +247,21 @@ struct Packer { //! \param n Length of array //! \param buffer Buffer to unpack from //! \param position Position in buffer to use - //! \param comm The communicator to use template - static void unpack(T* data, - std::size_t n, - std::vector& buffer, - int& position, - Parallel::MPIComm comm) + void unpack(T* data, + std::size_t n, + std::vector& buffer, + int& position) const { static_assert(std::is_pod_v, "Array packing not supported for non-pod data"); - detail::Packing::unpack(data, n, buffer, position, comm); + detail::Packing::unpack(data, n, buffer, position, m_comm); } + +private: + Parallel::Communication m_comm; //!< Communicator to use }; } // end namespace Mpi } // end namespace Opm -#endif // MPI_SERIALIZER_HPP +#endif // MPI_PACKER_HPP