changed: ewoms/parallel -> opm/models/parallel

This commit is contained in:
Arne Morten Kvarving 2019-09-16 09:48:55 +02:00
parent 162e4f4653
commit d8723dc9ce
11 changed files with 1098 additions and 6 deletions

View File

@ -0,0 +1,270 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
*
* \brief Provides data handles for parallel communication which
* operate on DOFs
*/
#ifndef EWOMS_GRID_COMM_HANDLES_HH
#define EWOMS_GRID_COMM_HANDLES_HH
#include <opm/material/common/Unused.hpp>
#include <dune/grid/common/datahandleif.hh>
#include <dune/common/version.hh>
namespace Opm {
/*!
* \brief Data handle for parallel communication which sums up all
* values are attached to DOFs
*/
template <class FieldType, class Container, class EntityMapper, int commCodim>
class GridCommHandleSum
: public Dune::CommDataHandleIF<GridCommHandleSum<FieldType, Container,
EntityMapper, commCodim>,
FieldType>
{
public:
GridCommHandleSum(Container& container, const EntityMapper& mapper)
: mapper_(mapper), container_(container)
{}
bool contains(int dim OPM_UNUSED, int codim) const
{
// return true if the codim is the same as the codim which we
// are asked to communicate with.
return codim == commCodim;
}
bool fixedsize(int dim OPM_UNUSED, int codim OPM_UNUSED) const
{
// for each DOF we communicate a single value which has a
// fixed size
return true;
}
template <class EntityType>
size_t size(const EntityType& e OPM_UNUSED) const
{
// communicate a field type per entity
return 1;
}
template <class MessageBufferImp, class EntityType>
void gather(MessageBufferImp& buff, const EntityType& e) const
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
buff.write(container_[dofIdx]);
}
template <class MessageBufferImp, class EntityType>
void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED)
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
FieldType tmp;
buff.read(tmp);
container_[dofIdx] += tmp;
}
private:
const EntityMapper& mapper_;
Container& container_;
};
/*!
* \brief Data handle for parallel communication which can be used to
* set the values values of ghost and overlap DOFs from their
* respective master processes.
*/
template <class FieldType, class Container, class EntityMapper, unsigned commCodim>
class GridCommHandleGhostSync
: public Dune::CommDataHandleIF<GridCommHandleGhostSync<FieldType, Container,
EntityMapper, commCodim>,
FieldType>
{
public:
GridCommHandleGhostSync(Container& container, const EntityMapper& mapper)
: mapper_(mapper), container_(container)
{
}
bool contains(unsigned dim OPM_UNUSED, unsigned codim) const
{
// return true if the codim is the same as the codim which we
// are asked to communicate with.
return codim == commCodim;
}
bool fixedsize(unsigned dim OPM_UNUSED, unsigned codim OPM_UNUSED) const
{
// for each DOF we communicate a single value which has a
// fixed size
return true;
}
template <class EntityType>
size_t size(const EntityType& e OPM_UNUSED) const
{
// communicate a field type per entity
return 1;
}
template <class MessageBufferImp, class EntityType>
void gather(MessageBufferImp& buff, const EntityType& e) const
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
buff.write(container_[dofIdx]);
}
template <class MessageBufferImp, class EntityType>
void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED)
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
buff.read(container_[dofIdx]);
}
private:
const EntityMapper& mapper_;
Container& container_;
};
/*!
* \brief Data handle for parallel communication which takes the
* maximum of all values that are attached to DOFs
*/
template <class FieldType, class Container, class EntityMapper, unsigned commCodim>
class GridCommHandleMax
: public Dune::CommDataHandleIF<GridCommHandleMax<FieldType, Container,
EntityMapper, commCodim>,
FieldType>
{
public:
GridCommHandleMax(Container& container, const EntityMapper& mapper)
: mapper_(mapper), container_(container)
{}
bool contains(unsigned dim OPM_UNUSED, unsigned codim) const
{
// return true if the codim is the same as the codim which we
// are asked to communicate with.
return codim == commCodim;
}
bool fixedsize(unsigned dim OPM_UNUSED, unsigned codim OPM_UNUSED) const
{
// for each DOF we communicate a single value which has a
// fixed size
return true;
}
template <class EntityType>
size_t size(const EntityType& e OPM_UNUSED) const
{
// communicate a field type per entity
return 1;
}
template <class MessageBufferImp, class EntityType>
void gather(MessageBufferImp& buff, const EntityType& e) const
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
buff.write(container_[dofIdx]);
}
template <class MessageBufferImp, class EntityType>
void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED)
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
FieldType tmp;
buff.read(tmp);
container_[dofIdx] = std::max(container_[dofIdx], tmp);
}
private:
const EntityMapper& mapper_;
Container& container_;
};
/*!
* \brief Provides data handle for parallel communication which takes
* the minimum of all values that are attached to DOFs
*/
template <class FieldType, class Container, class EntityMapper, unsigned commCodim>
class GridCommHandleMin
: public Dune::CommDataHandleIF<GridCommHandleMin<FieldType, Container,
EntityMapper, commCodim>,
FieldType>
{
public:
GridCommHandleMin(Container& container, const EntityMapper& mapper)
: mapper_(mapper), container_(container)
{}
bool contains(unsigned dim OPM_UNUSED, unsigned codim) const
{
// return true if the codim is the same as the codim which we
// are asked to communicate with.
return codim == commCodim;
}
bool fixedsize(unsigned dim OPM_UNUSED, unsigned codim OPM_UNUSED) const
{
// for each DOF we communicate a single value which has a
// fixed size
return true;
}
template <class EntityType>
size_t size(const EntityType& e OPM_UNUSED) const
{
// communicate a field type per entity
return 1;
}
template <class MessageBufferImp, class EntityType>
void gather(MessageBufferImp& buff, const EntityType& e) const
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
buff.write(container_[dofIdx]);
}
template <class MessageBufferImp, class EntityType>
void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED)
{
unsigned dofIdx = static_cast<unsigned>(mapper_.index(e));
FieldType tmp;
buff.read(tmp);
container_[dofIdx] = std::min(container_[dofIdx], tmp);
}
private:
const EntityMapper& mapper_;
Container& container_;
};
} // namespace Opm
#endif

View File

@ -0,0 +1,239 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::MpiBuffer
*/
#ifndef EWOMS_MPI_BUFFER_HH
#define EWOMS_MPI_BUFFER_HH
#if HAVE_MPI
#include <mpi.h>
#endif
#include <stddef.h>
#include <type_traits>
#include <cassert>
namespace Opm {
/*!
* \brief Simplifies handling of buffers to be used in conjunction with MPI
*/
template <class DataType>
class MpiBuffer
{
public:
MpiBuffer()
{
data_ = NULL;
dataSize_ = 0;
setMpiDataType_();
updateMpiDataSize_();
}
MpiBuffer(size_t size)
{
data_ = new DataType[size];
dataSize_ = size;
setMpiDataType_();
updateMpiDataSize_();
}
MpiBuffer(const MpiBuffer&) = default;
~MpiBuffer()
{ delete[] data_; }
/*!
* \brief Set the size of the buffer
*/
void resize(size_t newSize)
{
delete[] data_;
data_ = new DataType[newSize];
dataSize_ = newSize;
updateMpiDataSize_();
}
/*!
* \brief Send the buffer asyncronously to a peer process.
*/
void send(unsigned peerRank OPM_UNUSED_NOMPI)
{
#if HAVE_MPI
MPI_Isend(data_,
static_cast<int>(mpiDataSize_),
mpiDataType_,
static_cast<int>(peerRank),
0, // tag
MPI_COMM_WORLD,
&mpiRequest_);
#endif
}
/*!
* \brief Wait until the buffer was send to the peer completely.
*/
void wait()
{
#if HAVE_MPI
MPI_Wait(&mpiRequest_, &mpiStatus_);
#endif // HAVE_MPI
}
/*!
* \brief Receive the buffer syncronously from a peer rank
*/
void receive(unsigned peerRank OPM_UNUSED_NOMPI)
{
#if HAVE_MPI
MPI_Recv(data_,
static_cast<int>(mpiDataSize_),
mpiDataType_,
static_cast<int>(peerRank),
0, // tag
MPI_COMM_WORLD,
&mpiStatus_);
assert(!mpiStatus_.MPI_ERROR);
#endif // HAVE_MPI
}
#if HAVE_MPI
/*!
* \brief Returns the current MPI_Request object.
*
* This object is only well defined after the send() method.
*/
MPI_Request& request()
{ return mpiRequest_; }
/*!
* \brief Returns the current MPI_Request object.
*
* This object is only well defined after the send() method.
*/
const MPI_Request& request() const
{ return mpiRequest_; }
/*!
* \brief Returns the current MPI_Status object.
*
* This object is only well defined after the receive() and wait() methods.
*/
MPI_Status& status()
{ return mpiStatus_; }
/*!
* \brief Returns the current MPI_Status object.
*
* This object is only well defined after the receive() and wait() methods.
*/
const MPI_Status& status() const
{ return mpiStatus_; }
#endif // HAVE_MPI
/*!
* \brief Returns the number of data objects in the buffer
*/
size_t size() const
{ return dataSize_; }
/*!
* \brief Provide access to the buffer data.
*/
DataType& operator[](size_t i)
{
assert(0 <= i && i < dataSize_);
return data_[i];
}
/*!
* \brief Provide access to the buffer data.
*/
const DataType& operator[](size_t i) const
{
assert(0 <= i && i < dataSize_);
return data_[i];
}
private:
void setMpiDataType_()
{
#if HAVE_MPI
// set the MPI data type
if (std::is_same<DataType, char>::value)
mpiDataType_ = MPI_CHAR;
else if (std::is_same<DataType, unsigned char>::value)
mpiDataType_ = MPI_UNSIGNED_CHAR;
else if (std::is_same<DataType, short>::value)
mpiDataType_ = MPI_SHORT;
else if (std::is_same<DataType, unsigned short>::value)
mpiDataType_ = MPI_UNSIGNED_SHORT;
else if (std::is_same<DataType, int>::value)
mpiDataType_ = MPI_INT;
else if (std::is_same<DataType, unsigned>::value)
mpiDataType_ = MPI_UNSIGNED;
else if (std::is_same<DataType, long>::value)
mpiDataType_ = MPI_LONG;
else if (std::is_same<DataType, unsigned long>::value)
mpiDataType_ = MPI_UNSIGNED_LONG;
else if (std::is_same<DataType, long long>::value)
mpiDataType_ = MPI_LONG_LONG;
else if (std::is_same<DataType, unsigned long long>::value)
mpiDataType_ = MPI_UNSIGNED_LONG_LONG;
else if (std::is_same<DataType, float>::value)
mpiDataType_ = MPI_FLOAT;
else if (std::is_same<DataType, double>::value)
mpiDataType_ = MPI_DOUBLE;
else if (std::is_same<DataType, long double>::value)
mpiDataType_ = MPI_LONG_DOUBLE;
else {
mpiDataType_ = MPI_BYTE;
}
#endif // HAVE_MPI
}
void updateMpiDataSize_()
{
#if HAVE_MPI
mpiDataSize_ = dataSize_;
if (mpiDataType_ == MPI_BYTE)
mpiDataSize_ *= sizeof(DataType);
#endif // HAVE_MPI
}
DataType *data_;
size_t dataSize_;
#if HAVE_MPI
size_t mpiDataSize_;
MPI_Datatype mpiDataType_;
MPI_Request mpiRequest_;
MPI_Status mpiStatus_;
#endif // HAVE_MPI
};
} // namespace Opm
#endif

View File

@ -0,0 +1,348 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \brief Provides a mechanism to dispatch work to separate threads
*/
#ifndef EWOMS_TASKLETS_HH
#define EWOMS_TASKLETS_HH
#include <stdexcept>
#include <cassert>
#include <thread>
#include <queue>
#include <mutex>
#include <iostream>
#include <condition_variable>
namespace Opm {
/*!
* \brief The base class for tasklets.
*
* Tasklets are a generic mechanism for potentially running work in a separate thread.
*/
class TaskletInterface
{
public:
TaskletInterface(int refCount = 1)
: referenceCount_(refCount)
{}
virtual ~TaskletInterface() {}
virtual void run() = 0;
virtual bool isEndMarker () const { return false; }
void dereference()
{ -- referenceCount_; }
int referenceCount() const
{ return referenceCount_; }
private:
int referenceCount_;
};
/*!
* \brief A simple tasklet that runs a function that returns void and does not take any
* arguments a given number of times.
*/
template <class Fn>
class FunctionRunnerTasklet : public TaskletInterface
{
public:
FunctionRunnerTasklet(const FunctionRunnerTasklet&) = default;
FunctionRunnerTasklet(int numInvocations, const Fn& fn)
: TaskletInterface(numInvocations)
, fn_(fn)
{}
void run() override
{ fn_(); }
private:
const Fn& fn_;
};
class TaskletRunner;
// this class stores the thread local static attributes for the TaskletRunner class. we
// cannot put them directly into TaskletRunner because defining static members for
// non-template classes in headers leads the linker to choke in case multiple compile
// units are used.
template <class Dummy = void>
struct TaskletRunnerHelper_
{
static thread_local TaskletRunner* taskletRunner_;
static thread_local int workerThreadIndex_;
};
template <class Dummy>
thread_local TaskletRunner* TaskletRunnerHelper_<Dummy>::taskletRunner_ = nullptr;
template <class Dummy>
thread_local int TaskletRunnerHelper_<Dummy>::workerThreadIndex_ = -1;
/*!
* \brief Handles where a given tasklet is run.
*
* Depending on the number of worker threads, a tasklet can either be run in a separate
* worker thread or by the main thread.
*/
class TaskletRunner
{
/// \brief Implements a barrier. This class can only be used in the asynchronous case.
class BarrierTasklet : public TaskletInterface
{
public:
BarrierTasklet(unsigned numWorkers)
: TaskletInterface(/*refCount=*/numWorkers)
{
numWorkers_ = numWorkers;
numWaiting_ = 0;
}
void run()
{ wait(); }
void wait()
{
std::unique_lock<std::mutex> lock(barrierMutex_);
numWaiting_ += 1;
if (numWaiting_ >= numWorkers_ + 1) {
lock.unlock();
barrierCondition_.notify_all();
}
else {
const auto& areAllWaiting =
[this]() -> bool
{ return this->numWaiting_ >= this->numWorkers_ + 1; };
barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
}
}
private:
unsigned numWorkers_;
unsigned numWaiting_;
std::condition_variable barrierCondition_;
std::mutex barrierMutex_;
};
/// \brief TerminateThreadTasklet class
/// Empty tasklet marking thread termination.
class TerminateThreadTasklet : public TaskletInterface
{
public:
void run()
{ }
bool isEndMarker() const
{ return true; }
};
public:
// prohibit copying of tasklet runners
TaskletRunner(const TaskletRunner&) = delete;
/*!
* \brief Creates a tasklet runner with numWorkers underling threads for doing work.
*
* The number of worker threads may be 0. In this case, all work is done by the main
* thread (synchronous mode).
*/
TaskletRunner(unsigned numWorkers)
{
threads_.resize(numWorkers);
for (unsigned i = 0; i < numWorkers; ++i)
// create a worker thread
threads_[i].reset(new std::thread(startWorkerThread_, this, i));
}
/*!
* \brief Destructor
*
* If worker threads were created to run the tasklets, this method waits until all
* worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to
* be completed.
*/
~TaskletRunner()
{
if (threads_.size() > 0) {
// dispatch a tasklet which will terminate the worker thread
dispatch(std::make_shared<TerminateThreadTasklet>());
// wait until all worker threads have terminated
for (auto& thread : threads_)
thread->join();
}
}
/*!
* \brief Returns the index of the current worker thread.
*
* If the current thread is not a worker thread, -1 is returned.
*/
int workerThreadIndex() const
{
if (TaskletRunnerHelper_<void>::taskletRunner_ != this)
return -1;
return TaskletRunnerHelper_<void>::workerThreadIndex_;
}
/*!
* \brief Returns the number of worker threads for the tasklet runner.
*/
int numWorkerThreads() const
{ return threads_.size(); }
/*!
* \brief Add a new tasklet.
*
* The tasklet is either run immediately or deferred to a separate thread.
*/
void dispatch(std::shared_ptr<TaskletInterface> tasklet)
{
if (threads_.empty()) {
// run the tasklet immediately in synchronous mode.
while (tasklet->referenceCount() > 0) {
tasklet->dereference();
try {
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
}
catch (...) {
std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n";
}
}
}
else {
// lock mutex for the tasklet queue to make sure that nobody messes with the
// task queue
taskletQueueMutex_.lock();
// add the tasklet to the queue
taskletQueue_.push(tasklet);
taskletQueueMutex_.unlock();
workAvailableCondition_.notify_all();
}
}
/*!
* \brief Convenience method to construct a new function runner tasklet and dispatch it immediately.
*/
template <class Fn>
std::shared_ptr<FunctionRunnerTasklet<Fn> > dispatchFunction(Fn &fn, int numInvocations=1)
{
typedef FunctionRunnerTasklet<Fn> Tasklet;
auto tasklet = std::make_shared<Tasklet>(numInvocations, fn);
this->dispatch(tasklet);
return tasklet;
}
/*!
* \brief Make sure that all tasklets have been completed after this method has been called
*/
void barrier()
{
unsigned numWorkers = threads_.size();
if (numWorkers == 0)
// nothing needs to be done to implement a barrier in synchronous mode
return;
// dispatch a barrier tasklet and wait until it has been run by the worker thread
auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
dispatch(barrierTasklet);
barrierTasklet->wait();
}
protected:
// main function of the worker thread
static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex)
{
TaskletRunnerHelper_<void>::taskletRunner_ = taskletRunner;
TaskletRunnerHelper_<void>::workerThreadIndex_ = workerThreadIndex;
taskletRunner->run_();
}
//! do the work until the queue received an end tasklet
void run_()
{
while (true) {
// wait until tasklets have been pushed to the queue. first we need to lock
// mutex for access to taskletQueue_
std::unique_lock<std::mutex> lock(taskletQueueMutex_);
const auto& workIsAvailable =
[this]() -> bool
{ return !taskletQueue_.empty(); };
if (!workIsAvailable())
workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable);
// remove tasklet from queue
std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
// if tasklet is an end marker, terminate the thread and DO NOT remove the
// tasklet.
if (tasklet->isEndMarker()) {
if(taskletQueue_.size() > 1)
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
taskletQueueMutex_.unlock();
return;
}
tasklet->dereference();
if (tasklet->referenceCount() == 0)
// remove tasklets from the queue as soon as their reference count
// reaches zero, i.e. the tasklet has been run often enough.
taskletQueue_.pop();
lock.unlock();
// execute tasklet
try {
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
}
catch (...) {
std::cerr << "ERROR: Uncaught exception when running tasklet. Trying to continue.\n";
}
}
}
std::vector<std::unique_ptr<std::thread> > threads_;
std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
std::mutex taskletQueueMutex_;
std::condition_variable workAvailableCondition_;
};
} // end namespace Opm
#endif

View File

@ -0,0 +1,101 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::ThreadedEntityIterator
*/
#ifndef EWOMS_THREADED_ENTITY_ITERATOR_HH
#define EWOMS_THREADED_ENTITY_ITERATOR_HH
#include <thread>
#include <mutex>
namespace Opm {
/*!
* \brief Provides an STL-iterator like interface to iterate over the enties of a
* GridView in OpenMP threaded applications
*
* ATTENTION: This class must be instantiated in a sequential context!
*/
template <class GridView, int codim>
class ThreadedEntityIterator
{
typedef typename GridView::template Codim<codim>::Entity Entity;
typedef typename GridView::template Codim<codim>::Iterator EntityIterator;
public:
ThreadedEntityIterator(const GridView& gridView)
: gridView_(gridView)
, sequentialIt_(gridView_.template begin<codim>())
, sequentialEnd_(gridView.template end<codim>())
{ }
ThreadedEntityIterator(const ThreadedEntityIterator& other) = default;
// begin iterating over the grid in parallel
EntityIterator beginParallel()
{
mutex_.lock();
auto tmp = sequentialIt_;
if (sequentialIt_ != sequentialEnd_)
++sequentialIt_; // make the next thread look at the next element
mutex_.unlock();
return tmp;
}
// returns true if the last element was reached
bool isFinished(const EntityIterator& it) const
{ return it == sequentialEnd_; }
// make sure that the loop over the grid is finished
void setFinished()
{
mutex_.lock();
sequentialIt_ = sequentialEnd_;
mutex_.unlock();
}
// prefix increment: goes to the next element which is not yet worked on by any
// thread
EntityIterator increment()
{
mutex_.lock();
auto tmp = sequentialIt_;
if (sequentialIt_ != sequentialEnd_)
++sequentialIt_;
mutex_.unlock();
return tmp;
}
private:
GridView gridView_;
EntityIterator sequentialIt_;
EntityIterator sequentialEnd_;
std::mutex mutex_;
};
} // namespace Opm
#endif

View File

@ -0,0 +1,134 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::ThreadManager
*/
#ifndef EWOMS_THREAD_MANAGER_HH
#define EWOMS_THREAD_MANAGER_HH
#ifdef _OPENMP
#include <omp.h>
#endif
#include <ewoms/common/parametersystem.hh>
#include <ewoms/common/propertysystem.hh>
#include <opm/material/common/Exceptions.hpp>
#include <dune/common/version.hh>
BEGIN_PROPERTIES
NEW_PROP_TAG(ThreadsPerProcess);
END_PROPERTIES
namespace Opm {
/*!
* \brief Simplifies multi-threaded capabilities.
*/
template <class TypeTag>
class ThreadManager
{
public:
enum {
#if defined(_OPENMP) || DOXYGEN
//! Specify whether OpenMP is really available or not
isFake = false
#else
isFake = true
#endif
};
/*!
* \brief Register all run-time parameters of the thread manager.
*/
static void registerParameters()
{
EWOMS_REGISTER_PARAM(TypeTag, int, ThreadsPerProcess,
"The maximum number of threads to be instantiated per process "
"('-1' means 'automatic')");
}
static void init()
{
numThreads_ = EWOMS_GET_PARAM(TypeTag, int, ThreadsPerProcess);
// some safety checks. This is pretty ugly macro-magic, but so what?
#if !defined(_OPENMP)
if (numThreads_ != 1 && numThreads_ != -1)
throw std::invalid_argument("OpenMP is not available. The only valid values for "
"threads-per-process is 1 and -1 but it is "+std::to_string(numThreads_)+"!");
numThreads_ = 1;
#elif !defined NDEBUG && defined DUNE_INTERFACECHECK
if (numThreads_ != 1)
throw std::invalid_argument("You explicitly enabled Barton-Nackman interface checking in Dune. "
"The Dune implementation of this is currently incompatible with "
"thread parallelism!");
numThreads_ = 1;
#else
if (numThreads_ == 0)
throw std::invalid_argument("Zero threads per process are not possible: It must be at least 1, "
"(or -1 for 'automatic')!");
#endif
#ifdef _OPENMP
// actually limit the number of threads and get the number of threads which are
// used in the end.
if (numThreads_ > 0)
omp_set_num_threads(numThreads_);
numThreads_ = omp_get_max_threads();
#endif
}
/*!
* \brief Return the maximum number of threads of the current process.
*/
static unsigned maxThreads()
{ return static_cast<unsigned>(numThreads_); }
/*!
* \brief Return the index of the current OpenMP thread
*/
static unsigned threadId()
{
#ifdef _OPENMP
return static_cast<unsigned>(omp_get_thread_num());
#else
return 0;
#endif
}
private:
static int numThreads_;
};
template <class TypeTag>
int ThreadManager<TypeTag>::numThreads_ = 1;
} // namespace Opm
#endif

View File

@ -30,7 +30,7 @@
#include "overlaptypes.hh"
#if HAVE_MPI
#include <ewoms/parallel/mpibuffer.hh>
#include <opm/models/parallel/mpibuffer.hh>
#include <dune/grid/common/datahandleif.hh>
#include <dune/grid/common/gridenums.hh>

View File

@ -31,7 +31,7 @@
#include "blacklist.hh"
#include "globalindices.hh"
#include <ewoms/parallel/mpibuffer.hh>
#include <opm/models/parallel/mpibuffer.hh>
#include <algorithm>
#include <limits>

View File

@ -30,7 +30,7 @@
#include "overlaptypes.hh"
#include "blacklist.hh"
#include <ewoms/parallel/mpibuffer.hh>
#include <opm/models/parallel/mpibuffer.hh>
#include <opm/material/common/Unused.hpp>

View File

@ -30,7 +30,7 @@
#include <opm/simulators/linalg/domesticoverlapfrombcrsmatrix.hh>
#include <opm/simulators/linalg/globalindices.hh>
#include <opm/simulators/linalg/blacklist.hh>
#include <ewoms/parallel/mpibuffer.hh>
#include <opm/models/parallel/mpibuffer.hh>
#include <opm/material/common/Valgrind.hpp>

View File

@ -29,7 +29,7 @@
#include "overlaptypes.hh"
#include <ewoms/parallel/mpibuffer.hh>
#include <opm/models/parallel/mpibuffer.hh>
#include <opm/material/common/Valgrind.hpp>
#include <dune/istl/bvector.hh>

View File

@ -28,7 +28,7 @@
*/
#include "config.h"
#include <ewoms/parallel/tasklets.hh>
#include <opm/models/parallel/tasklets.hh>
#include <chrono>
#include <iostream>