From 69fd6495c0cbd430643f9c7c75601b4d9b557f5e Mon Sep 17 00:00:00 2001 From: Markus Blatt Date: Mon, 14 Dec 2020 16:35:30 +0100 Subject: [PATCH] Added factory to construct the global representation of perf data. Some of our computations are heavily serial and need a complete representation of the data attached to all perforation no matter whether a perforation lives on the local partition or not. This commit adds a factory that allows to easily create such a representaion and helps writing data back to the local representation. --- .../wells/BlackoilWellModel_impl.hpp | 6 +- opm/simulators/wells/ParallelWellInfo.cpp | 179 +++++++++++++++++- opm/simulators/wells/ParallelWellInfo.hpp | 87 ++++++++- tests/test_parallelwellinfo.cpp | 163 +++++++++++++++- 4 files changed, 414 insertions(+), 21 deletions(-) diff --git a/opm/simulators/wells/BlackoilWellModel_impl.hpp b/opm/simulators/wells/BlackoilWellModel_impl.hpp index df95c9c20..ab78e8225 100644 --- a/opm/simulators/wells/BlackoilWellModel_impl.hpp +++ b/opm/simulators/wells/BlackoilWellModel_impl.hpp @@ -467,7 +467,7 @@ namespace Opm { // Clear the communication data structures for above values. for(auto&& pinfo : local_parallel_well_info_) { - pinfo->clearCommunicateAboveBelow(); + pinfo->clear(); } } @@ -650,6 +650,9 @@ namespace Opm { completion_index); } firstOpenCompletion = false; + // Next time this index is the one above as each open completion is + // is stored somehwere. + completion_index_above = completion_index; } else { checker.connectionFound(completion_index); if (completion.state() != Connection::State::SHUT) { @@ -660,7 +663,6 @@ namespace Opm { // Note: we rely on the connections being filtered! I.e. there are only connections // to active cells in the global grid. ++completion_index; - ++completion_index_above; } parallelWellInfo.endReset(); checker.checkAllConnectionsFound(); diff --git a/opm/simulators/wells/ParallelWellInfo.cpp b/opm/simulators/wells/ParallelWellInfo.cpp index 03b879623..224b7a77b 100644 --- a/opm/simulators/wells/ParallelWellInfo.cpp +++ b/opm/simulators/wells/ParallelWellInfo.cpp @@ -43,6 +43,143 @@ struct CommPolicy namespace Opm { + +GlobalPerfContainerFactory::GlobalPerfContainerFactory(const IndexSet& local_indices, const Communication comm, + const int num_local_perfs) + : local_indices_(local_indices), comm_(comm) +{ + if ( comm_.size() > 1 ) + { + // The global index used in the index set current_indices + // is the index of the perforation in ECL Schedule definition. + // This is assumed to give the topological order. + // allgather the index of the perforation in ECL schedule and the value. + sizes_.resize(comm_.size()); + displ_.resize(comm_.size() + 1, 0); + // Send the int for convenience. It will be used to get the place where the + // data comes from + using Pair = std::pair; + std::vector my_pairs; + my_pairs.reserve(local_indices_.size()); + for (const auto& pair: local_indices_) + { + if (pair.local().attribute() == Attribute::owner) + { + my_pairs.emplace_back(pair.global(), -1); + } + } + int mySize = my_pairs.size(); + comm_.allgather(&mySize, 1, sizes_.data()); + std::partial_sum(sizes_.begin(), sizes_.end(), displ_.begin()+1); + std::vector global_pairs(displ_.back()); + comm_.allgatherv(my_pairs.data(), my_pairs.size(), global_pairs.data(), sizes_.data(), displ_.data()); + // Set the the index where we receive + int count = 0; + std::for_each(global_pairs.begin(), global_pairs.end(), [&count](Pair& pair){ pair.second = count++;}); + // sort the complete range to get the correct ordering + std::sort(global_pairs.begin(), global_pairs.end(), + [](const Pair& p1, const Pair& p2){ return p1.first < p2.first; } ); + map_received_.resize(global_pairs.size()); + std::transform(global_pairs.begin(), global_pairs.end(), map_received_.begin(), + [](const Pair& pair){ return pair.second; }); + perf_ecl_index_.resize(global_pairs.size()); + std::transform(global_pairs.begin(), global_pairs.end(), perf_ecl_index_.begin(), + [](const Pair& pair){ return pair.first; }); + num_global_perfs_ = global_pairs.size(); + } + else + { + num_global_perfs_ = num_local_perfs; + } +} + + + +std::vector GlobalPerfContainerFactory::createGlobal(const std::vector& local_perf_container, + std::size_t num_components) const +{ + // Could be become templated later. + using Value = double; + + if (comm_.size() > 1) + { + std::vector global_recv(perf_ecl_index_.size() * num_components); + if (num_components == 1) + { + comm_.allgatherv(local_perf_container.data(), local_perf_container.size(), + global_recv.data(), const_cast(sizes_.data()), + const_cast(displ_.data())); + } + else + { +#if HAVE_MPI + // Create MPI type for sending num_components entries + MPI_Datatype data_type; + MPI_Type_contiguous(num_components, Dune::MPITraits::getType(), &data_type); + MPI_Type_commit(&data_type); + MPI_Allgatherv(local_perf_container.data(), + local_perf_container.size()/num_components, + data_type, global_recv.data(), sizes_.data(), + displ_.data(), data_type, comm_); + MPI_Type_free(&data_type); +#endif + } + + // reorder by ascending ecl index. + std::vector global_remapped(perf_ecl_index_.size() * num_components); + auto global = global_remapped.begin(); + for (auto map_entry = map_received_.begin(); map_entry != map_received_.end(); ++map_entry) + { + auto global_index = *map_entry * num_components; + + for(std::size_t i = 0; i < num_components; ++i) + *(global++) = global_recv[global_index++]; + } + assert(global == global_remapped.end()); + return global_remapped; + } + else + { + return local_perf_container; + } +} + +void GlobalPerfContainerFactory::copyGlobalToLocal(const std::vector& global, std::vector& local, + std::size_t num_components) const +{ + if (global.empty()) + { + return; + } + + if (comm_.size() > 1) + { + // assign the values (both ranges are sorted by the ecl index) + auto global_perf = perf_ecl_index_.begin(); + + for (const auto& pair: local_indices_) + { + global_perf = std::lower_bound(global_perf, perf_ecl_index_.end(), pair.global()); + assert(global_perf != perf_ecl_index_.end()); + assert(*global_perf == pair.global()); + auto local_index = pair.local() * num_components; + auto global_index = (global_perf - perf_ecl_index_.begin()) * num_components; + for (std::size_t i = 0; i < num_components; ++i) + local[local_index++] = global[global_index++]; + } + } + else + { + std::copy(global.begin(), global.end(), local.begin()); + } +} + +int GlobalPerfContainerFactory::numGlobalPerfs() const +{ + return num_global_perfs_; +} + + CommunicateAboveBelow::CommunicateAboveBelow([[maybe_unused]] const Communication& comm) #if HAVE_MPI : comm_(comm), interface_(comm_) @@ -57,7 +194,7 @@ void CommunicateAboveBelow::clear() interface_.free(); communicator_.free(); #endif - count_ = 0; + num_local_perfs_ = 0; } void CommunicateAboveBelow::beginReset() @@ -72,7 +209,7 @@ void CommunicateAboveBelow::beginReset() #endif } -void CommunicateAboveBelow::endReset() +int CommunicateAboveBelow::endReset() { #if HAVE_MPI if (comm_.size() > 1) @@ -80,7 +217,7 @@ void CommunicateAboveBelow::endReset() above_indices_.endResize(); current_indices_.endResize(); remote_indices_.setIndexSets(current_indices_, above_indices_, comm_); - remote_indices_.setIncludeSelf(true); + // It is mandatory to not set includeSelf to true, as this will skip some entries. remote_indices_.rebuild(); using FromSet = Dune::EnumItem; using ToSet = Dune::AllSet; @@ -88,6 +225,7 @@ void CommunicateAboveBelow::endReset() communicator_.build(interface_); } #endif + return num_local_perfs_; } struct CopyGatherScatter @@ -171,10 +309,11 @@ void CommunicateAboveBelow::pushBackEclIndex([[maybe_unused]] int above, { attr = overlap; } - above_indices_.add(above, {count_, attr, true}); - current_indices_.add(current, {count_++, attr, true}); + above_indices_.add(above, {num_local_perfs_, attr, true}); + current_indices_.add(current, {num_local_perfs_, attr, true}); } #endif + ++num_local_perfs_; } @@ -197,6 +336,17 @@ void ParallelWellInfo::DestroyComm::operator()(Communication* comm) delete comm; } + +const CommunicateAboveBelow::IndexSet& CommunicateAboveBelow::getIndexSet() const +{ + return current_indices_; +} + +int CommunicateAboveBelow::numLocalPerfs() const +{ + return num_local_perfs_; +} + ParallelWellInfo::ParallelWellInfo(const std::string& name, bool hasLocalCells) : name_(name), hasLocalCells_ (hasLocalCells), @@ -245,12 +395,17 @@ void ParallelWellInfo::beginReset() void ParallelWellInfo::endReset() { - commAboveBelow_->beginReset(); + int local_num_perfs = commAboveBelow_->endReset(); + globalPerfCont_ + .reset(new GlobalPerfContainerFactory(commAboveBelow_->getIndexSet(), + *comm_, + local_num_perfs)); } -void ParallelWellInfo::clearCommunicateAboveBelow() +void ParallelWellInfo::clear() { commAboveBelow_->clear(); + globalPerfCont_.reset(); } std::vector ParallelWellInfo::communicateAboveValues(double zero_value, @@ -283,6 +438,16 @@ std::vector ParallelWellInfo::communicateBelowValues(double last_value, current_values.size()); } +const GlobalPerfContainerFactory& +ParallelWellInfo::getGlobalPerfContainerFactory() const +{ + if(globalPerfCont_) + return *globalPerfCont_; + else + OPM_THROW(std::logic_error, + "No ecl indices have been added via beginReset, pushBackEclIndex, endReset"); +} + bool operator<(const ParallelWellInfo& well1, const ParallelWellInfo& well2) { return well1.name() < well2.name() || (! (well2.name() < well1.name()) && well1.hasLocalCells() < well2.hasLocalCells()); diff --git a/opm/simulators/wells/ParallelWellInfo.hpp b/opm/simulators/wells/ParallelWellInfo.hpp index f8d0d4b22..f8ded3706 100644 --- a/opm/simulators/wells/ParallelWellInfo.hpp +++ b/opm/simulators/wells/ParallelWellInfo.hpp @@ -38,8 +38,15 @@ namespace Opm /// class CommunicateAboveBelow { +public: enum Attribute { - owner=1, overlap=2 + owner=1, + overlap=2, + // there is a bug in older versions of DUNE that will skip + // entries with matching attributes in RemoteIndices that are local + // therefore we add one more version for above. + ownerAbove = 3, + overlapAbove = 4 }; using MPIComm = typename Dune::MPIHelper::MPICommunicator; #if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7) @@ -53,7 +60,6 @@ class CommunicateAboveBelow using RI = Dune::RemoteIndices; #endif -public: explicit CommunicateAboveBelow(const Communication& comm); /// \brief Adds information about original index of the perforations in ECL Schedule. /// @@ -75,7 +81,8 @@ public: /// /// Sets up the commmunication structures to be used by /// communicate() - void endReset(); + /// \return The number of local perforations + int endReset(); /// \brief Creates an array of values for the perforation above. /// \param first_value Value to use for above of the first perforation @@ -161,20 +168,79 @@ public: } } + /// \brief Get index set for the local perforations. + const IndexSet& getIndexSet() const; + + int numLocalPerfs() const; private: Communication comm_; -#if HAVE_MPI /// \brief Mapping of the local well index to ecl index IndexSet current_indices_; +#if HAVE_MPI /// \brief Mapping of the above well index to ecl index IndexSet above_indices_; RI remote_indices_; Dune::Interface interface_; Dune::BufferedCommunicator communicator_; #endif - std::size_t count_{}; + std::size_t num_local_perfs_{}; }; +/// \brief A factory for creating a global data representation for distributed wells. +/// +/// Unfortunately, there are occassion where we need to compute sequential on a well +/// even if the data is distributed. This class is supposed to help with that by +/// computing the global data arrays for the well and copy computed values back to +/// the distributed representation. +class GlobalPerfContainerFactory +{ +public: + using MPIComm = typename Dune::MPIHelper::MPICommunicator; +#if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7) + using Communication = Dune::Communication; +#else + using Communication = Dune::CollectiveCommunication; +#endif + using IndexSet = CommunicateAboveBelow::IndexSet; + using Attribute = CommunicateAboveBelow::Attribute; + using GlobalIndex = typename IndexSet::IndexPair::GlobalIndex; + + /// \brief Constructor + /// \param local_indices completely set up index set for map ecl index to local index + GlobalPerfContainerFactory(const IndexSet& local_indices, const Communication comm, + int num_local_perfs); + + /// \brief Creates a container that holds values for all perforations + /// \param local_perf_container Container with values attached to the local perforations. + /// \param num_components the number of components per perforation. + /// \return A container with values attached to all perforations of a well. + /// Values are ordered by the index of the perforation in the ECL schedule. + std::vector createGlobal(const std::vector& local_perf_container, + std::size_t num_components) const; + + /// \brief Copies the values of the global perforation to the local representation + /// \param global values attached to all peforations of a well (as if the well would live on one process) + /// \param num_components the number of components per perforation. + /// \param[out] local The values attached to the local perforations only. + void copyGlobalToLocal(const std::vector& global, std::vector& local, + std::size_t num_components) const; + + int numGlobalPerfs() const; +private: + const IndexSet& local_indices_; + Communication comm_; + int num_global_perfs_; + /// \brief sizes for allgatherv + std::vector sizes_; + /// \brief displacement for allgatherv + std::vector displ_; + /// \brief Mapping for storing gathered local values at the correct index. + std::vector map_received_; + /// \brief The index of a perforation in the schedule of ECL + /// + /// This is is sorted. + std::vector perf_ecl_index_; +}; /// \brief Class encapsulating some information about parallel wells /// @@ -316,7 +382,14 @@ public: } /// \brief Free data of communication data structures. - void clearCommunicateAboveBelow(); + void clear(); + + /// \brief Get a factor to create a global representation of peforation data. + /// + /// That is a container that holds data for every perforation no matter where + /// it is stored. Container is ordered via ascendings index of the perforations + /// in the ECL schedule. + const GlobalPerfContainerFactory& getGlobalPerfContainerFactory() const; private: /// \brief Deleter that also frees custom MPI communicators @@ -341,6 +414,8 @@ private: /// \brief used to communicate the values for the perforation above. std::unique_ptr commAboveBelow_; + + std::unique_ptr globalPerfCont_; }; /// \brief Class checking that all connections are on active cells diff --git a/tests/test_parallelwellinfo.cpp b/tests/test_parallelwellinfo.cpp index 2914e3548..660fa4eaf 100644 --- a/tests/test_parallelwellinfo.cpp +++ b/tests/test_parallelwellinfo.cpp @@ -23,6 +23,9 @@ #include #include #include +#include +#include +#include #define BOOST_TEST_MODULE ParallelWellInfo #include @@ -96,6 +99,8 @@ std::ostream& operator<<(std::ostream& os, const Opm::ParallelWellInfo& w) } } +constexpr int numPerProc = 3; + BOOST_AUTO_TEST_CASE(ParallelWellComparison) { int argc = 0; @@ -232,7 +237,7 @@ std::vector createGlobalEclIndex(const Communication& comm) { std::vector globalEclIndex = {0, 1, 2, 3, 7 , 8, 10, 11}; auto oldSize = globalEclIndex.size(); - std::size_t globalSize = 3 * comm.size(); + std::size_t globalSize = numPerProc * comm.size(); auto lastIndex = globalEclIndex.back(); globalEclIndex.resize(globalSize); if ( globalSize > oldSize) @@ -251,14 +256,17 @@ template std::vector populateCommAbove(C& commAboveBelow, const Communication& comm, const std::vector& globalEclIndex, - const std::vector globalCurrent) + const std::vector globalCurrent, + int num_component = 1, + bool local_consecutive = false) { - std::vector current(3); + auto size = numPerProc * num_component; + std::vector current(size); commAboveBelow.beginReset(); - for (std::size_t i = 0; i < current.size(); ++i) + for (std::size_t i = 0; i < current.size()/num_component; i++) { - auto gi = comm.rank() + comm.size() * i; + auto gi = local_consecutive ? comm.rank() * numPerProc + i : comm.rank() + comm.size() * i; if (gi==0) { @@ -268,7 +276,8 @@ std::vector populateCommAbove(C& commAboveBelow, { commAboveBelow.pushBackEclIndex(globalEclIndex[gi-1], globalEclIndex[gi]); } - current[i] = globalCurrent[gi]; + for(int c = 0; c < num_component; ++c) + current[i * num_component + c] = globalCurrent[gi * num_component + c]; } commAboveBelow.endReset(); return current; @@ -317,3 +326,145 @@ BOOST_AUTO_TEST_CASE(CommunicateAboveBelowParallel) } } } + +template +void initRandomNumbers(Iter begin, Iter end, C comm) +{ + // Initialize with random numbers. + std::random_device rndDevice; + std::mt19937 mersenneEngine {rndDevice()}; // Generates random integers + std::uniform_int_distribution dist {1, 100}; + + auto gen = [&dist, &mersenneEngine](){ + return dist(mersenneEngine); + }; + + std::generate(begin, end, gen); + comm.broadcast(&(*begin), end-begin, 0); +} + +BOOST_AUTO_TEST_CASE(PartialSumself) +{ + auto comm = Dune::MPIHelper::getLocalCommunicator(); + + Opm::CommunicateAboveBelow commAboveBelow{ comm }; + std::vector eclIndex = {0, 1, 2, 3, 7 , 8, 10, 11}; + std::vector current(eclIndex.size()); + std::transform(eclIndex.begin(), eclIndex.end(), current.begin(), + [](double v){ return 1+10.0*v;}); + commAboveBelow.beginReset(); + for (std::size_t i = 0; i < current.size(); ++i) + { + if (i==0) + commAboveBelow.pushBackEclIndex(-1, eclIndex[i]); + else + commAboveBelow.pushBackEclIndex(eclIndex[i-1], eclIndex[i]); + } + commAboveBelow.endReset(); + + initRandomNumbers(std::begin(current), std::end(current), + Communication(comm)); + auto stdCopy = current; + std::partial_sum(std::begin(stdCopy), std::end(stdCopy), std::begin(stdCopy)); + + + commAboveBelow.partialSumPerfValues(std::begin(current), std::end(current)); + + BOOST_CHECK_EQUAL_COLLECTIONS(std::begin(stdCopy), std::end(stdCopy), + std::begin(current), std::end(current)); +} + +BOOST_AUTO_TEST_CASE(PartialSumParallel) +{ + + auto comm = Communication(Dune::MPIHelper::getCommunicator()); + + Opm::CommunicateAboveBelow commAboveBelow{ comm }; + auto globalEclIndex = createGlobalEclIndex(comm); + std::vector globalCurrent(globalEclIndex.size()); + initRandomNumbers(std::begin(globalCurrent), std::end(globalCurrent), + Communication(comm)); + + auto localCurrent = populateCommAbove(commAboveBelow, comm, + globalEclIndex, globalCurrent); + + auto globalPartialSum = globalCurrent; + + std::partial_sum(std::begin(globalPartialSum), std::end(globalPartialSum), std::begin(globalPartialSum)); + + + commAboveBelow.partialSumPerfValues(std::begin(localCurrent), std::end(localCurrent)); + + + for (std::size_t i = 0; i < localCurrent.size(); ++i) + { + auto gi = comm.rank() + comm.size() * i; + BOOST_CHECK(localCurrent[i]==globalPartialSum[gi]); + } +} + +void testGlobalPerfFactoryParallel(int num_component, bool local_consecutive = false) +{ + auto comm = Communication(Dune::MPIHelper::getCommunicator()); + + Opm::ParallelWellInfo wellInfo{ {"Test", true }, comm }; + auto globalEclIndex = createGlobalEclIndex(comm); + std::vector globalCurrent(globalEclIndex.size() * num_component); + std::vector globalAdd(globalEclIndex.size() * num_component); + initRandomNumbers(std::begin(globalCurrent), std::end(globalCurrent), + comm); + initRandomNumbers(std::begin(globalAdd), std::end(globalAdd), + comm); + + auto localCurrent = populateCommAbove(wellInfo, comm, globalEclIndex, + globalCurrent, num_component, + local_consecutive); + + // A hack to get local values to add. + Opm::ParallelWellInfo dummy{ {"Test", true }, comm }; + auto localAdd = populateCommAbove(dummy, comm, globalEclIndex, + globalAdd, num_component, + local_consecutive); + + const auto& factory = wellInfo.getGlobalPerfContainerFactory(); + + auto globalCreated = factory.createGlobal(localCurrent, num_component); + + + BOOST_CHECK_EQUAL_COLLECTIONS(std::begin(globalCurrent), std::end(globalCurrent), + std::begin(globalCreated), std::end(globalCreated)); + + std::transform(std::begin(globalAdd), std::end(globalAdd), + std::begin(globalCreated), std::begin(globalCreated), + std::plus()); + + auto globalSol = globalCurrent; + std::transform(std::begin(globalAdd), std::end(globalAdd), + std::begin(globalSol), std::begin(globalSol), + std::plus()); + + auto localSol = localCurrent; + + std::transform(std::begin(localAdd), std::end(localAdd), + std::begin(localSol), std::begin(localSol), + std::plus()); + factory.copyGlobalToLocal(globalCreated, localCurrent, num_component); + + for (std::size_t i = 0; i < localCurrent.size() / num_component; ++i) + { + auto gi = local_consecutive ? comm.rank() * numPerProc + i : + comm.rank() + comm.size() * i; + for (int c = 0; c < num_component; ++c) + { + BOOST_CHECK(localCurrent[i * num_component + c]==globalSol[gi * num_component + c]); + BOOST_CHECK(localSol[i * num_component + c] == localCurrent[i * num_component + c]); + } + } +} + +BOOST_AUTO_TEST_CASE(GlobalPerfFactoryParallel1) +{ + + testGlobalPerfFactoryParallel(1); + testGlobalPerfFactoryParallel(3); +}