Added factory to construct the global representation of perf data.

Some of our computations are heavily serial and need a complete
representation of the data attached to all perforation no matter
whether a perforation lives on the local partition or not. This commit
adds a factory that allows to easily create such a representaion and
helps writing data back to the local representation.
This commit is contained in:
Markus Blatt 2020-12-14 16:35:30 +01:00
parent 53b51eeba7
commit 69fd6495c0
4 changed files with 414 additions and 21 deletions

View File

@ -467,7 +467,7 @@ namespace Opm {
// Clear the communication data structures for above values.
for(auto&& pinfo : local_parallel_well_info_)
{
pinfo->clearCommunicateAboveBelow();
pinfo->clear();
}
}
@ -650,6 +650,9 @@ namespace Opm {
completion_index);
}
firstOpenCompletion = false;
// Next time this index is the one above as each open completion is
// is stored somehwere.
completion_index_above = completion_index;
} else {
checker.connectionFound(completion_index);
if (completion.state() != Connection::State::SHUT) {
@ -660,7 +663,6 @@ namespace Opm {
// Note: we rely on the connections being filtered! I.e. there are only connections
// to active cells in the global grid.
++completion_index;
++completion_index_above;
}
parallelWellInfo.endReset();
checker.checkAllConnectionsFound();

View File

@ -43,6 +43,143 @@ struct CommPolicy<double*>
namespace Opm
{
GlobalPerfContainerFactory::GlobalPerfContainerFactory(const IndexSet& local_indices, const Communication comm,
const int num_local_perfs)
: local_indices_(local_indices), comm_(comm)
{
if ( comm_.size() > 1 )
{
// The global index used in the index set current_indices
// is the index of the perforation in ECL Schedule definition.
// This is assumed to give the topological order.
// allgather the index of the perforation in ECL schedule and the value.
sizes_.resize(comm_.size());
displ_.resize(comm_.size() + 1, 0);
// Send the int for convenience. It will be used to get the place where the
// data comes from
using Pair = std::pair<GlobalIndex,int>;
std::vector<Pair> my_pairs;
my_pairs.reserve(local_indices_.size());
for (const auto& pair: local_indices_)
{
if (pair.local().attribute() == Attribute::owner)
{
my_pairs.emplace_back(pair.global(), -1);
}
}
int mySize = my_pairs.size();
comm_.allgather(&mySize, 1, sizes_.data());
std::partial_sum(sizes_.begin(), sizes_.end(), displ_.begin()+1);
std::vector<Pair> global_pairs(displ_.back());
comm_.allgatherv(my_pairs.data(), my_pairs.size(), global_pairs.data(), sizes_.data(), displ_.data());
// Set the the index where we receive
int count = 0;
std::for_each(global_pairs.begin(), global_pairs.end(), [&count](Pair& pair){ pair.second = count++;});
// sort the complete range to get the correct ordering
std::sort(global_pairs.begin(), global_pairs.end(),
[](const Pair& p1, const Pair& p2){ return p1.first < p2.first; } );
map_received_.resize(global_pairs.size());
std::transform(global_pairs.begin(), global_pairs.end(), map_received_.begin(),
[](const Pair& pair){ return pair.second; });
perf_ecl_index_.resize(global_pairs.size());
std::transform(global_pairs.begin(), global_pairs.end(), perf_ecl_index_.begin(),
[](const Pair& pair){ return pair.first; });
num_global_perfs_ = global_pairs.size();
}
else
{
num_global_perfs_ = num_local_perfs;
}
}
std::vector<double> GlobalPerfContainerFactory::createGlobal(const std::vector<double>& local_perf_container,
std::size_t num_components) const
{
// Could be become templated later.
using Value = double;
if (comm_.size() > 1)
{
std::vector<Value> global_recv(perf_ecl_index_.size() * num_components);
if (num_components == 1)
{
comm_.allgatherv(local_perf_container.data(), local_perf_container.size(),
global_recv.data(), const_cast<int*>(sizes_.data()),
const_cast<int*>(displ_.data()));
}
else
{
#if HAVE_MPI
// Create MPI type for sending num_components entries
MPI_Datatype data_type;
MPI_Type_contiguous(num_components, Dune::MPITraits<Value>::getType(), &data_type);
MPI_Type_commit(&data_type);
MPI_Allgatherv(local_perf_container.data(),
local_perf_container.size()/num_components,
data_type, global_recv.data(), sizes_.data(),
displ_.data(), data_type, comm_);
MPI_Type_free(&data_type);
#endif
}
// reorder by ascending ecl index.
std::vector<Value> global_remapped(perf_ecl_index_.size() * num_components);
auto global = global_remapped.begin();
for (auto map_entry = map_received_.begin(); map_entry != map_received_.end(); ++map_entry)
{
auto global_index = *map_entry * num_components;
for(std::size_t i = 0; i < num_components; ++i)
*(global++) = global_recv[global_index++];
}
assert(global == global_remapped.end());
return global_remapped;
}
else
{
return local_perf_container;
}
}
void GlobalPerfContainerFactory::copyGlobalToLocal(const std::vector<double>& global, std::vector<double>& local,
std::size_t num_components) const
{
if (global.empty())
{
return;
}
if (comm_.size() > 1)
{
// assign the values (both ranges are sorted by the ecl index)
auto global_perf = perf_ecl_index_.begin();
for (const auto& pair: local_indices_)
{
global_perf = std::lower_bound(global_perf, perf_ecl_index_.end(), pair.global());
assert(global_perf != perf_ecl_index_.end());
assert(*global_perf == pair.global());
auto local_index = pair.local() * num_components;
auto global_index = (global_perf - perf_ecl_index_.begin()) * num_components;
for (std::size_t i = 0; i < num_components; ++i)
local[local_index++] = global[global_index++];
}
}
else
{
std::copy(global.begin(), global.end(), local.begin());
}
}
int GlobalPerfContainerFactory::numGlobalPerfs() const
{
return num_global_perfs_;
}
CommunicateAboveBelow::CommunicateAboveBelow([[maybe_unused]] const Communication& comm)
#if HAVE_MPI
: comm_(comm), interface_(comm_)
@ -57,7 +194,7 @@ void CommunicateAboveBelow::clear()
interface_.free();
communicator_.free();
#endif
count_ = 0;
num_local_perfs_ = 0;
}
void CommunicateAboveBelow::beginReset()
@ -72,7 +209,7 @@ void CommunicateAboveBelow::beginReset()
#endif
}
void CommunicateAboveBelow::endReset()
int CommunicateAboveBelow::endReset()
{
#if HAVE_MPI
if (comm_.size() > 1)
@ -80,7 +217,7 @@ void CommunicateAboveBelow::endReset()
above_indices_.endResize();
current_indices_.endResize();
remote_indices_.setIndexSets(current_indices_, above_indices_, comm_);
remote_indices_.setIncludeSelf(true);
// It is mandatory to not set includeSelf to true, as this will skip some entries.
remote_indices_.rebuild<true>();
using FromSet = Dune::EnumItem<Attribute,owner>;
using ToSet = Dune::AllSet<Attribute>;
@ -88,6 +225,7 @@ void CommunicateAboveBelow::endReset()
communicator_.build<double*>(interface_);
}
#endif
return num_local_perfs_;
}
struct CopyGatherScatter
@ -171,10 +309,11 @@ void CommunicateAboveBelow::pushBackEclIndex([[maybe_unused]] int above,
{
attr = overlap;
}
above_indices_.add(above, {count_, attr, true});
current_indices_.add(current, {count_++, attr, true});
above_indices_.add(above, {num_local_perfs_, attr, true});
current_indices_.add(current, {num_local_perfs_, attr, true});
}
#endif
++num_local_perfs_;
}
@ -197,6 +336,17 @@ void ParallelWellInfo::DestroyComm::operator()(Communication* comm)
delete comm;
}
const CommunicateAboveBelow::IndexSet& CommunicateAboveBelow::getIndexSet() const
{
return current_indices_;
}
int CommunicateAboveBelow::numLocalPerfs() const
{
return num_local_perfs_;
}
ParallelWellInfo::ParallelWellInfo(const std::string& name,
bool hasLocalCells)
: name_(name), hasLocalCells_ (hasLocalCells),
@ -245,12 +395,17 @@ void ParallelWellInfo::beginReset()
void ParallelWellInfo::endReset()
{
commAboveBelow_->beginReset();
int local_num_perfs = commAboveBelow_->endReset();
globalPerfCont_
.reset(new GlobalPerfContainerFactory(commAboveBelow_->getIndexSet(),
*comm_,
local_num_perfs));
}
void ParallelWellInfo::clearCommunicateAboveBelow()
void ParallelWellInfo::clear()
{
commAboveBelow_->clear();
globalPerfCont_.reset();
}
std::vector<double> ParallelWellInfo::communicateAboveValues(double zero_value,
@ -283,6 +438,16 @@ std::vector<double> ParallelWellInfo::communicateBelowValues(double last_value,
current_values.size());
}
const GlobalPerfContainerFactory&
ParallelWellInfo::getGlobalPerfContainerFactory() const
{
if(globalPerfCont_)
return *globalPerfCont_;
else
OPM_THROW(std::logic_error,
"No ecl indices have been added via beginReset, pushBackEclIndex, endReset");
}
bool operator<(const ParallelWellInfo& well1, const ParallelWellInfo& well2)
{
return well1.name() < well2.name() || (! (well2.name() < well1.name()) && well1.hasLocalCells() < well2.hasLocalCells());

View File

@ -38,8 +38,15 @@ namespace Opm
///
class CommunicateAboveBelow
{
public:
enum Attribute {
owner=1, overlap=2
owner=1,
overlap=2,
// there is a bug in older versions of DUNE that will skip
// entries with matching attributes in RemoteIndices that are local
// therefore we add one more version for above.
ownerAbove = 3,
overlapAbove = 4
};
using MPIComm = typename Dune::MPIHelper::MPICommunicator;
#if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
@ -53,7 +60,6 @@ class CommunicateAboveBelow
using RI = Dune::RemoteIndices<IndexSet>;
#endif
public:
explicit CommunicateAboveBelow(const Communication& comm);
/// \brief Adds information about original index of the perforations in ECL Schedule.
///
@ -75,7 +81,8 @@ public:
///
/// Sets up the commmunication structures to be used by
/// communicate()
void endReset();
/// \return The number of local perforations
int endReset();
/// \brief Creates an array of values for the perforation above.
/// \param first_value Value to use for above of the first perforation
@ -161,20 +168,79 @@ public:
}
}
/// \brief Get index set for the local perforations.
const IndexSet& getIndexSet() const;
int numLocalPerfs() const;
private:
Communication comm_;
#if HAVE_MPI
/// \brief Mapping of the local well index to ecl index
IndexSet current_indices_;
#if HAVE_MPI
/// \brief Mapping of the above well index to ecl index
IndexSet above_indices_;
RI remote_indices_;
Dune::Interface interface_;
Dune::BufferedCommunicator communicator_;
#endif
std::size_t count_{};
std::size_t num_local_perfs_{};
};
/// \brief A factory for creating a global data representation for distributed wells.
///
/// Unfortunately, there are occassion where we need to compute sequential on a well
/// even if the data is distributed. This class is supposed to help with that by
/// computing the global data arrays for the well and copy computed values back to
/// the distributed representation.
class GlobalPerfContainerFactory
{
public:
using MPIComm = typename Dune::MPIHelper::MPICommunicator;
#if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
using Communication = Dune::Communication<MPIComm>;
#else
using Communication = Dune::CollectiveCommunication<MPIComm>;
#endif
using IndexSet = CommunicateAboveBelow::IndexSet;
using Attribute = CommunicateAboveBelow::Attribute;
using GlobalIndex = typename IndexSet::IndexPair::GlobalIndex;
/// \brief Constructor
/// \param local_indices completely set up index set for map ecl index to local index
GlobalPerfContainerFactory(const IndexSet& local_indices, const Communication comm,
int num_local_perfs);
/// \brief Creates a container that holds values for all perforations
/// \param local_perf_container Container with values attached to the local perforations.
/// \param num_components the number of components per perforation.
/// \return A container with values attached to all perforations of a well.
/// Values are ordered by the index of the perforation in the ECL schedule.
std::vector<double> createGlobal(const std::vector<double>& local_perf_container,
std::size_t num_components) const;
/// \brief Copies the values of the global perforation to the local representation
/// \param global values attached to all peforations of a well (as if the well would live on one process)
/// \param num_components the number of components per perforation.
/// \param[out] local The values attached to the local perforations only.
void copyGlobalToLocal(const std::vector<double>& global, std::vector<double>& local,
std::size_t num_components) const;
int numGlobalPerfs() const;
private:
const IndexSet& local_indices_;
Communication comm_;
int num_global_perfs_;
/// \brief sizes for allgatherv
std::vector<int> sizes_;
/// \brief displacement for allgatherv
std::vector<int> displ_;
/// \brief Mapping for storing gathered local values at the correct index.
std::vector<int> map_received_;
/// \brief The index of a perforation in the schedule of ECL
///
/// This is is sorted.
std::vector<int> perf_ecl_index_;
};
/// \brief Class encapsulating some information about parallel wells
///
@ -316,7 +382,14 @@ public:
}
/// \brief Free data of communication data structures.
void clearCommunicateAboveBelow();
void clear();
/// \brief Get a factor to create a global representation of peforation data.
///
/// That is a container that holds data for every perforation no matter where
/// it is stored. Container is ordered via ascendings index of the perforations
/// in the ECL schedule.
const GlobalPerfContainerFactory& getGlobalPerfContainerFactory() const;
private:
/// \brief Deleter that also frees custom MPI communicators
@ -341,6 +414,8 @@ private:
/// \brief used to communicate the values for the perforation above.
std::unique_ptr<CommunicateAboveBelow> commAboveBelow_;
std::unique_ptr<GlobalPerfContainerFactory> globalPerfCont_;
};
/// \brief Class checking that all connections are on active cells

View File

@ -23,6 +23,9 @@
#include<string>
#include<tuple>
#include<ostream>
#include <random>
#include <algorithm>
#include <iterator>
#define BOOST_TEST_MODULE ParallelWellInfo
#include <boost/test/unit_test.hpp>
@ -96,6 +99,8 @@ std::ostream& operator<<(std::ostream& os, const Opm::ParallelWellInfo& w)
}
}
constexpr int numPerProc = 3;
BOOST_AUTO_TEST_CASE(ParallelWellComparison)
{
int argc = 0;
@ -232,7 +237,7 @@ std::vector<int> createGlobalEclIndex(const Communication& comm)
{
std::vector<int> globalEclIndex = {0, 1, 2, 3, 7 , 8, 10, 11};
auto oldSize = globalEclIndex.size();
std::size_t globalSize = 3 * comm.size();
std::size_t globalSize = numPerProc * comm.size();
auto lastIndex = globalEclIndex.back();
globalEclIndex.resize(globalSize);
if ( globalSize > oldSize)
@ -251,14 +256,17 @@ template<class C>
std::vector<double> populateCommAbove(C& commAboveBelow,
const Communication& comm,
const std::vector<int>& globalEclIndex,
const std::vector<double> globalCurrent)
const std::vector<double> globalCurrent,
int num_component = 1,
bool local_consecutive = false)
{
std::vector<double> current(3);
auto size = numPerProc * num_component;
std::vector<double> current(size);
commAboveBelow.beginReset();
for (std::size_t i = 0; i < current.size(); ++i)
for (std::size_t i = 0; i < current.size()/num_component; i++)
{
auto gi = comm.rank() + comm.size() * i;
auto gi = local_consecutive ? comm.rank() * numPerProc + i : comm.rank() + comm.size() * i;
if (gi==0)
{
@ -268,7 +276,8 @@ std::vector<double> populateCommAbove(C& commAboveBelow,
{
commAboveBelow.pushBackEclIndex(globalEclIndex[gi-1], globalEclIndex[gi]);
}
current[i] = globalCurrent[gi];
for(int c = 0; c < num_component; ++c)
current[i * num_component + c] = globalCurrent[gi * num_component + c];
}
commAboveBelow.endReset();
return current;
@ -317,3 +326,145 @@ BOOST_AUTO_TEST_CASE(CommunicateAboveBelowParallel)
}
}
}
template<class Iter, class C>
void initRandomNumbers(Iter begin, Iter end, C comm)
{
// Initialize with random numbers.
std::random_device rndDevice;
std::mt19937 mersenneEngine {rndDevice()}; // Generates random integers
std::uniform_int_distribution<int> dist {1, 100};
auto gen = [&dist, &mersenneEngine](){
return dist(mersenneEngine);
};
std::generate(begin, end, gen);
comm.broadcast(&(*begin), end-begin, 0);
}
BOOST_AUTO_TEST_CASE(PartialSumself)
{
auto comm = Dune::MPIHelper::getLocalCommunicator();
Opm::CommunicateAboveBelow commAboveBelow{ comm };
std::vector<int> eclIndex = {0, 1, 2, 3, 7 , 8, 10, 11};
std::vector<double> current(eclIndex.size());
std::transform(eclIndex.begin(), eclIndex.end(), current.begin(),
[](double v){ return 1+10.0*v;});
commAboveBelow.beginReset();
for (std::size_t i = 0; i < current.size(); ++i)
{
if (i==0)
commAboveBelow.pushBackEclIndex(-1, eclIndex[i]);
else
commAboveBelow.pushBackEclIndex(eclIndex[i-1], eclIndex[i]);
}
commAboveBelow.endReset();
initRandomNumbers(std::begin(current), std::end(current),
Communication(comm));
auto stdCopy = current;
std::partial_sum(std::begin(stdCopy), std::end(stdCopy), std::begin(stdCopy));
commAboveBelow.partialSumPerfValues(std::begin(current), std::end(current));
BOOST_CHECK_EQUAL_COLLECTIONS(std::begin(stdCopy), std::end(stdCopy),
std::begin(current), std::end(current));
}
BOOST_AUTO_TEST_CASE(PartialSumParallel)
{
auto comm = Communication(Dune::MPIHelper::getCommunicator());
Opm::CommunicateAboveBelow commAboveBelow{ comm };
auto globalEclIndex = createGlobalEclIndex(comm);
std::vector<double> globalCurrent(globalEclIndex.size());
initRandomNumbers(std::begin(globalCurrent), std::end(globalCurrent),
Communication(comm));
auto localCurrent = populateCommAbove(commAboveBelow, comm,
globalEclIndex, globalCurrent);
auto globalPartialSum = globalCurrent;
std::partial_sum(std::begin(globalPartialSum), std::end(globalPartialSum), std::begin(globalPartialSum));
commAboveBelow.partialSumPerfValues(std::begin(localCurrent), std::end(localCurrent));
for (std::size_t i = 0; i < localCurrent.size(); ++i)
{
auto gi = comm.rank() + comm.size() * i;
BOOST_CHECK(localCurrent[i]==globalPartialSum[gi]);
}
}
void testGlobalPerfFactoryParallel(int num_component, bool local_consecutive = false)
{
auto comm = Communication(Dune::MPIHelper::getCommunicator());
Opm::ParallelWellInfo wellInfo{ {"Test", true }, comm };
auto globalEclIndex = createGlobalEclIndex(comm);
std::vector<double> globalCurrent(globalEclIndex.size() * num_component);
std::vector<double> globalAdd(globalEclIndex.size() * num_component);
initRandomNumbers(std::begin(globalCurrent), std::end(globalCurrent),
comm);
initRandomNumbers(std::begin(globalAdd), std::end(globalAdd),
comm);
auto localCurrent = populateCommAbove(wellInfo, comm, globalEclIndex,
globalCurrent, num_component,
local_consecutive);
// A hack to get local values to add.
Opm::ParallelWellInfo dummy{ {"Test", true }, comm };
auto localAdd = populateCommAbove(dummy, comm, globalEclIndex,
globalAdd, num_component,
local_consecutive);
const auto& factory = wellInfo.getGlobalPerfContainerFactory();
auto globalCreated = factory.createGlobal(localCurrent, num_component);
BOOST_CHECK_EQUAL_COLLECTIONS(std::begin(globalCurrent), std::end(globalCurrent),
std::begin(globalCreated), std::end(globalCreated));
std::transform(std::begin(globalAdd), std::end(globalAdd),
std::begin(globalCreated), std::begin(globalCreated),
std::plus<double>());
auto globalSol = globalCurrent;
std::transform(std::begin(globalAdd), std::end(globalAdd),
std::begin(globalSol), std::begin(globalSol),
std::plus<double>());
auto localSol = localCurrent;
std::transform(std::begin(localAdd), std::end(localAdd),
std::begin(localSol), std::begin(localSol),
std::plus<double>());
factory.copyGlobalToLocal(globalCreated, localCurrent, num_component);
for (std::size_t i = 0; i < localCurrent.size() / num_component; ++i)
{
auto gi = local_consecutive ? comm.rank() * numPerProc + i :
comm.rank() + comm.size() * i;
for (int c = 0; c < num_component; ++c)
{
BOOST_CHECK(localCurrent[i * num_component + c]==globalSol[gi * num_component + c]);
BOOST_CHECK(localSol[i * num_component + c] == localCurrent[i * num_component + c]);
}
}
}
BOOST_AUTO_TEST_CASE(GlobalPerfFactoryParallel1)
{
testGlobalPerfFactoryParallel(1);
testGlobalPerfFactoryParallel(3);
}