From d8723dc9cea93b4a19bffce7c614b8a3053b7f6a Mon Sep 17 00:00:00 2001 From: Arne Morten Kvarving Date: Mon, 16 Sep 2019 09:48:55 +0200 Subject: [PATCH] changed: ewoms/parallel -> opm/models/parallel --- opm/models/parallel/gridcommhandles.hh | 270 ++++++++++++++ opm/models/parallel/mpibuffer.hh | 239 ++++++++++++ opm/models/parallel/tasklets.hh | 348 ++++++++++++++++++ opm/models/parallel/threadedentityiterator.hh | 101 +++++ opm/models/parallel/threadmanager.hh | 134 +++++++ opm/simulators/linalg/blacklist.hh | 2 +- .../linalg/domesticoverlapfrombcrsmatrix.hh | 2 +- .../linalg/foreignoverlapfrombcrsmatrix.hh | 2 +- .../linalg/overlappingbcrsmatrix.hh | 2 +- .../linalg/overlappingblockvector.hh | 2 +- tests/models/test_tasklets.cpp | 2 +- 11 files changed, 1098 insertions(+), 6 deletions(-) create mode 100644 opm/models/parallel/gridcommhandles.hh create mode 100644 opm/models/parallel/mpibuffer.hh create mode 100644 opm/models/parallel/tasklets.hh create mode 100644 opm/models/parallel/threadedentityiterator.hh create mode 100644 opm/models/parallel/threadmanager.hh diff --git a/opm/models/parallel/gridcommhandles.hh b/opm/models/parallel/gridcommhandles.hh new file mode 100644 index 000000000..ff00e3844 --- /dev/null +++ b/opm/models/parallel/gridcommhandles.hh @@ -0,0 +1,270 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * + * \brief Provides data handles for parallel communication which + * operate on DOFs + */ +#ifndef EWOMS_GRID_COMM_HANDLES_HH +#define EWOMS_GRID_COMM_HANDLES_HH + +#include + +#include +#include + +namespace Opm { + +/*! + * \brief Data handle for parallel communication which sums up all + * values are attached to DOFs + */ +template +class GridCommHandleSum + : public Dune::CommDataHandleIF, + FieldType> +{ +public: + GridCommHandleSum(Container& container, const EntityMapper& mapper) + : mapper_(mapper), container_(container) + {} + + bool contains(int dim OPM_UNUSED, int codim) const + { + // return true if the codim is the same as the codim which we + // are asked to communicate with. + return codim == commCodim; + } + + bool fixedsize(int dim OPM_UNUSED, int codim OPM_UNUSED) const + { + // for each DOF we communicate a single value which has a + // fixed size + return true; + } + + template + size_t size(const EntityType& e OPM_UNUSED) const + { + // communicate a field type per entity + return 1; + } + + template + void gather(MessageBufferImp& buff, const EntityType& e) const + { + unsigned dofIdx = static_cast(mapper_.index(e)); + buff.write(container_[dofIdx]); + } + + template + void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED) + { + unsigned dofIdx = static_cast(mapper_.index(e)); + + FieldType tmp; + buff.read(tmp); + container_[dofIdx] += tmp; + } + +private: + const EntityMapper& mapper_; + Container& container_; +}; + +/*! + * \brief Data handle for parallel communication which can be used to + * set the values values of ghost and overlap DOFs from their + * respective master processes. + */ +template +class GridCommHandleGhostSync + : public Dune::CommDataHandleIF, + FieldType> +{ +public: + GridCommHandleGhostSync(Container& container, const EntityMapper& mapper) + : mapper_(mapper), container_(container) + { + } + + bool contains(unsigned dim OPM_UNUSED, unsigned codim) const + { + // return true if the codim is the same as the codim which we + // are asked to communicate with. + return codim == commCodim; + } + + bool fixedsize(unsigned dim OPM_UNUSED, unsigned codim OPM_UNUSED) const + { + // for each DOF we communicate a single value which has a + // fixed size + return true; + } + + template + size_t size(const EntityType& e OPM_UNUSED) const + { + // communicate a field type per entity + return 1; + } + + template + void gather(MessageBufferImp& buff, const EntityType& e) const + { + unsigned dofIdx = static_cast(mapper_.index(e)); + buff.write(container_[dofIdx]); + } + + template + void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED) + { + unsigned dofIdx = static_cast(mapper_.index(e)); + buff.read(container_[dofIdx]); + } + +private: + const EntityMapper& mapper_; + Container& container_; +}; + +/*! + * \brief Data handle for parallel communication which takes the + * maximum of all values that are attached to DOFs + */ +template +class GridCommHandleMax + : public Dune::CommDataHandleIF, + FieldType> +{ +public: + GridCommHandleMax(Container& container, const EntityMapper& mapper) + : mapper_(mapper), container_(container) + {} + + bool contains(unsigned dim OPM_UNUSED, unsigned codim) const + { + // return true if the codim is the same as the codim which we + // are asked to communicate with. + return codim == commCodim; + } + + bool fixedsize(unsigned dim OPM_UNUSED, unsigned codim OPM_UNUSED) const + { + // for each DOF we communicate a single value which has a + // fixed size + return true; + } + + template + size_t size(const EntityType& e OPM_UNUSED) const + { + // communicate a field type per entity + return 1; + } + + template + void gather(MessageBufferImp& buff, const EntityType& e) const + { + unsigned dofIdx = static_cast(mapper_.index(e)); + buff.write(container_[dofIdx]); + } + + template + void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED) + { + unsigned dofIdx = static_cast(mapper_.index(e)); + FieldType tmp; + buff.read(tmp); + container_[dofIdx] = std::max(container_[dofIdx], tmp); + } + +private: + const EntityMapper& mapper_; + Container& container_; +}; + +/*! + * \brief Provides data handle for parallel communication which takes + * the minimum of all values that are attached to DOFs + */ +template +class GridCommHandleMin + : public Dune::CommDataHandleIF, + FieldType> +{ +public: + GridCommHandleMin(Container& container, const EntityMapper& mapper) + : mapper_(mapper), container_(container) + {} + + bool contains(unsigned dim OPM_UNUSED, unsigned codim) const + { + // return true if the codim is the same as the codim which we + // are asked to communicate with. + return codim == commCodim; + } + + bool fixedsize(unsigned dim OPM_UNUSED, unsigned codim OPM_UNUSED) const + { + // for each DOF we communicate a single value which has a + // fixed size + return true; + } + + template + size_t size(const EntityType& e OPM_UNUSED) const + { + // communicate a field type per entity + return 1; + } + + template + void gather(MessageBufferImp& buff, const EntityType& e) const + { + unsigned dofIdx = static_cast(mapper_.index(e)); + buff.write(container_[dofIdx]); + } + + template + void scatter(MessageBufferImp& buff, const EntityType& e, size_t n OPM_UNUSED) + { + unsigned dofIdx = static_cast(mapper_.index(e)); + FieldType tmp; + buff.read(tmp); + container_[dofIdx] = std::min(container_[dofIdx], tmp); + } + +private: + const EntityMapper& mapper_; + Container& container_; +}; + +} // namespace Opm + +#endif diff --git a/opm/models/parallel/mpibuffer.hh b/opm/models/parallel/mpibuffer.hh new file mode 100644 index 000000000..43ebaa0af --- /dev/null +++ b/opm/models/parallel/mpibuffer.hh @@ -0,0 +1,239 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \copydoc Opm::MpiBuffer + */ +#ifndef EWOMS_MPI_BUFFER_HH +#define EWOMS_MPI_BUFFER_HH + +#if HAVE_MPI +#include +#endif + +#include + +#include +#include + +namespace Opm { + +/*! + * \brief Simplifies handling of buffers to be used in conjunction with MPI + */ +template +class MpiBuffer +{ +public: + MpiBuffer() + { + data_ = NULL; + dataSize_ = 0; + + setMpiDataType_(); + updateMpiDataSize_(); + } + + MpiBuffer(size_t size) + { + data_ = new DataType[size]; + dataSize_ = size; + + setMpiDataType_(); + updateMpiDataSize_(); + } + + MpiBuffer(const MpiBuffer&) = default; + + ~MpiBuffer() + { delete[] data_; } + + /*! + * \brief Set the size of the buffer + */ + void resize(size_t newSize) + { + delete[] data_; + data_ = new DataType[newSize]; + dataSize_ = newSize; + updateMpiDataSize_(); + } + + /*! + * \brief Send the buffer asyncronously to a peer process. + */ + void send(unsigned peerRank OPM_UNUSED_NOMPI) + { +#if HAVE_MPI + MPI_Isend(data_, + static_cast(mpiDataSize_), + mpiDataType_, + static_cast(peerRank), + 0, // tag + MPI_COMM_WORLD, + &mpiRequest_); +#endif + } + + /*! + * \brief Wait until the buffer was send to the peer completely. + */ + void wait() + { +#if HAVE_MPI + MPI_Wait(&mpiRequest_, &mpiStatus_); +#endif // HAVE_MPI + } + + /*! + * \brief Receive the buffer syncronously from a peer rank + */ + void receive(unsigned peerRank OPM_UNUSED_NOMPI) + { +#if HAVE_MPI + MPI_Recv(data_, + static_cast(mpiDataSize_), + mpiDataType_, + static_cast(peerRank), + 0, // tag + MPI_COMM_WORLD, + &mpiStatus_); + assert(!mpiStatus_.MPI_ERROR); +#endif // HAVE_MPI + } + +#if HAVE_MPI + /*! + * \brief Returns the current MPI_Request object. + * + * This object is only well defined after the send() method. + */ + MPI_Request& request() + { return mpiRequest_; } + /*! + * \brief Returns the current MPI_Request object. + * + * This object is only well defined after the send() method. + */ + const MPI_Request& request() const + { return mpiRequest_; } + + /*! + * \brief Returns the current MPI_Status object. + * + * This object is only well defined after the receive() and wait() methods. + */ + MPI_Status& status() + { return mpiStatus_; } + /*! + * \brief Returns the current MPI_Status object. + * + * This object is only well defined after the receive() and wait() methods. + */ + const MPI_Status& status() const + { return mpiStatus_; } +#endif // HAVE_MPI + + /*! + * \brief Returns the number of data objects in the buffer + */ + size_t size() const + { return dataSize_; } + + /*! + * \brief Provide access to the buffer data. + */ + DataType& operator[](size_t i) + { + assert(0 <= i && i < dataSize_); + return data_[i]; + } + + /*! + * \brief Provide access to the buffer data. + */ + const DataType& operator[](size_t i) const + { + assert(0 <= i && i < dataSize_); + return data_[i]; + } + +private: + void setMpiDataType_() + { +#if HAVE_MPI + // set the MPI data type + if (std::is_same::value) + mpiDataType_ = MPI_CHAR; + else if (std::is_same::value) + mpiDataType_ = MPI_UNSIGNED_CHAR; + else if (std::is_same::value) + mpiDataType_ = MPI_SHORT; + else if (std::is_same::value) + mpiDataType_ = MPI_UNSIGNED_SHORT; + else if (std::is_same::value) + mpiDataType_ = MPI_INT; + else if (std::is_same::value) + mpiDataType_ = MPI_UNSIGNED; + else if (std::is_same::value) + mpiDataType_ = MPI_LONG; + else if (std::is_same::value) + mpiDataType_ = MPI_UNSIGNED_LONG; + else if (std::is_same::value) + mpiDataType_ = MPI_LONG_LONG; + else if (std::is_same::value) + mpiDataType_ = MPI_UNSIGNED_LONG_LONG; + else if (std::is_same::value) + mpiDataType_ = MPI_FLOAT; + else if (std::is_same::value) + mpiDataType_ = MPI_DOUBLE; + else if (std::is_same::value) + mpiDataType_ = MPI_LONG_DOUBLE; + else { + mpiDataType_ = MPI_BYTE; + } +#endif // HAVE_MPI + } + + void updateMpiDataSize_() + { +#if HAVE_MPI + mpiDataSize_ = dataSize_; + if (mpiDataType_ == MPI_BYTE) + mpiDataSize_ *= sizeof(DataType); +#endif // HAVE_MPI + } + + DataType *data_; + size_t dataSize_; +#if HAVE_MPI + size_t mpiDataSize_; + MPI_Datatype mpiDataType_; + MPI_Request mpiRequest_; + MPI_Status mpiStatus_; +#endif // HAVE_MPI +}; + +} // namespace Opm + +#endif diff --git a/opm/models/parallel/tasklets.hh b/opm/models/parallel/tasklets.hh new file mode 100644 index 000000000..36e23a65e --- /dev/null +++ b/opm/models/parallel/tasklets.hh @@ -0,0 +1,348 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \brief Provides a mechanism to dispatch work to separate threads + */ +#ifndef EWOMS_TASKLETS_HH +#define EWOMS_TASKLETS_HH + +#include +#include +#include +#include +#include +#include +#include + +namespace Opm { + +/*! + * \brief The base class for tasklets. + * + * Tasklets are a generic mechanism for potentially running work in a separate thread. + */ +class TaskletInterface +{ +public: + TaskletInterface(int refCount = 1) + : referenceCount_(refCount) + {} + virtual ~TaskletInterface() {} + virtual void run() = 0; + virtual bool isEndMarker () const { return false; } + + void dereference() + { -- referenceCount_; } + + int referenceCount() const + { return referenceCount_; } + +private: + int referenceCount_; +}; + +/*! + * \brief A simple tasklet that runs a function that returns void and does not take any + * arguments a given number of times. + */ +template +class FunctionRunnerTasklet : public TaskletInterface +{ +public: + FunctionRunnerTasklet(const FunctionRunnerTasklet&) = default; + FunctionRunnerTasklet(int numInvocations, const Fn& fn) + : TaskletInterface(numInvocations) + , fn_(fn) + {} + void run() override + { fn_(); } + +private: + const Fn& fn_; +}; + +class TaskletRunner; + +// this class stores the thread local static attributes for the TaskletRunner class. we +// cannot put them directly into TaskletRunner because defining static members for +// non-template classes in headers leads the linker to choke in case multiple compile +// units are used. +template +struct TaskletRunnerHelper_ +{ + static thread_local TaskletRunner* taskletRunner_; + static thread_local int workerThreadIndex_; +}; + +template +thread_local TaskletRunner* TaskletRunnerHelper_::taskletRunner_ = nullptr; + +template +thread_local int TaskletRunnerHelper_::workerThreadIndex_ = -1; + +/*! + * \brief Handles where a given tasklet is run. + * + * Depending on the number of worker threads, a tasklet can either be run in a separate + * worker thread or by the main thread. + */ +class TaskletRunner +{ + /// \brief Implements a barrier. This class can only be used in the asynchronous case. + class BarrierTasklet : public TaskletInterface + { + public: + BarrierTasklet(unsigned numWorkers) + : TaskletInterface(/*refCount=*/numWorkers) + { + numWorkers_ = numWorkers; + numWaiting_ = 0; + } + + void run() + { wait(); } + + void wait() + { + std::unique_lock lock(barrierMutex_); + + numWaiting_ += 1; + if (numWaiting_ >= numWorkers_ + 1) { + lock.unlock(); + barrierCondition_.notify_all(); + } + else { + const auto& areAllWaiting = + [this]() -> bool + { return this->numWaiting_ >= this->numWorkers_ + 1; }; + + barrierCondition_.wait(lock, /*predicate=*/areAllWaiting); + } + } + + private: + unsigned numWorkers_; + unsigned numWaiting_; + + std::condition_variable barrierCondition_; + std::mutex barrierMutex_; + }; + + /// \brief TerminateThreadTasklet class + /// Empty tasklet marking thread termination. + class TerminateThreadTasklet : public TaskletInterface + { + public: + void run() + { } + + bool isEndMarker() const + { return true; } + }; + +public: + // prohibit copying of tasklet runners + TaskletRunner(const TaskletRunner&) = delete; + + /*! + * \brief Creates a tasklet runner with numWorkers underling threads for doing work. + * + * The number of worker threads may be 0. In this case, all work is done by the main + * thread (synchronous mode). + */ + TaskletRunner(unsigned numWorkers) + { + threads_.resize(numWorkers); + for (unsigned i = 0; i < numWorkers; ++i) + // create a worker thread + threads_[i].reset(new std::thread(startWorkerThread_, this, i)); + } + + /*! + * \brief Destructor + * + * If worker threads were created to run the tasklets, this method waits until all + * worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to + * be completed. + */ + ~TaskletRunner() + { + if (threads_.size() > 0) { + // dispatch a tasklet which will terminate the worker thread + dispatch(std::make_shared()); + + // wait until all worker threads have terminated + for (auto& thread : threads_) + thread->join(); + } + } + + /*! + * \brief Returns the index of the current worker thread. + * + * If the current thread is not a worker thread, -1 is returned. + */ + int workerThreadIndex() const + { + if (TaskletRunnerHelper_::taskletRunner_ != this) + return -1; + return TaskletRunnerHelper_::workerThreadIndex_; + } + + /*! + * \brief Returns the number of worker threads for the tasklet runner. + */ + int numWorkerThreads() const + { return threads_.size(); } + + /*! + * \brief Add a new tasklet. + * + * The tasklet is either run immediately or deferred to a separate thread. + */ + void dispatch(std::shared_ptr tasklet) + { + if (threads_.empty()) { + // run the tasklet immediately in synchronous mode. + while (tasklet->referenceCount() > 0) { + tasklet->dereference(); + try { + tasklet->run(); + } + catch (const std::exception& e) { + std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n"; + } + catch (...) { + std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n"; + } + } + } + else { + // lock mutex for the tasklet queue to make sure that nobody messes with the + // task queue + taskletQueueMutex_.lock(); + + // add the tasklet to the queue + taskletQueue_.push(tasklet); + + taskletQueueMutex_.unlock(); + + workAvailableCondition_.notify_all(); + } + } + + /*! + * \brief Convenience method to construct a new function runner tasklet and dispatch it immediately. + */ + template + std::shared_ptr > dispatchFunction(Fn &fn, int numInvocations=1) + { + typedef FunctionRunnerTasklet Tasklet; + auto tasklet = std::make_shared(numInvocations, fn); + this->dispatch(tasklet); + return tasklet; + } + + /*! + * \brief Make sure that all tasklets have been completed after this method has been called + */ + void barrier() + { + unsigned numWorkers = threads_.size(); + if (numWorkers == 0) + // nothing needs to be done to implement a barrier in synchronous mode + return; + + // dispatch a barrier tasklet and wait until it has been run by the worker thread + auto barrierTasklet = std::make_shared(numWorkers); + dispatch(barrierTasklet); + + barrierTasklet->wait(); + } + +protected: + // main function of the worker thread + static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex) + { + TaskletRunnerHelper_::taskletRunner_ = taskletRunner; + TaskletRunnerHelper_::workerThreadIndex_ = workerThreadIndex; + + taskletRunner->run_(); + } + + //! do the work until the queue received an end tasklet + void run_() + { + while (true) { + // wait until tasklets have been pushed to the queue. first we need to lock + // mutex for access to taskletQueue_ + std::unique_lock lock(taskletQueueMutex_); + + const auto& workIsAvailable = + [this]() -> bool + { return !taskletQueue_.empty(); }; + + if (!workIsAvailable()) + workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable); + + // remove tasklet from queue + std::shared_ptr tasklet = taskletQueue_.front(); + + // if tasklet is an end marker, terminate the thread and DO NOT remove the + // tasklet. + if (tasklet->isEndMarker()) { + if(taskletQueue_.size() > 1) + throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); + taskletQueueMutex_.unlock(); + return; + } + + tasklet->dereference(); + if (tasklet->referenceCount() == 0) + // remove tasklets from the queue as soon as their reference count + // reaches zero, i.e. the tasklet has been run often enough. + taskletQueue_.pop(); + lock.unlock(); + + // execute tasklet + try { + tasklet->run(); + } + catch (const std::exception& e) { + std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n"; + } + catch (...) { + std::cerr << "ERROR: Uncaught exception when running tasklet. Trying to continue.\n"; + } + } + } + + std::vector > threads_; + std::queue > taskletQueue_; + std::mutex taskletQueueMutex_; + std::condition_variable workAvailableCondition_; +}; + +} // end namespace Opm +#endif diff --git a/opm/models/parallel/threadedentityiterator.hh b/opm/models/parallel/threadedentityiterator.hh new file mode 100644 index 000000000..838e08654 --- /dev/null +++ b/opm/models/parallel/threadedentityiterator.hh @@ -0,0 +1,101 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \copydoc Opm::ThreadedEntityIterator + */ +#ifndef EWOMS_THREADED_ENTITY_ITERATOR_HH +#define EWOMS_THREADED_ENTITY_ITERATOR_HH + +#include +#include + +namespace Opm { + +/*! + * \brief Provides an STL-iterator like interface to iterate over the enties of a + * GridView in OpenMP threaded applications + * + * ATTENTION: This class must be instantiated in a sequential context! + */ +template +class ThreadedEntityIterator +{ + typedef typename GridView::template Codim::Entity Entity; + typedef typename GridView::template Codim::Iterator EntityIterator; +public: + ThreadedEntityIterator(const GridView& gridView) + : gridView_(gridView) + , sequentialIt_(gridView_.template begin()) + , sequentialEnd_(gridView.template end()) + { } + + ThreadedEntityIterator(const ThreadedEntityIterator& other) = default; + + // begin iterating over the grid in parallel + EntityIterator beginParallel() + { + mutex_.lock(); + auto tmp = sequentialIt_; + if (sequentialIt_ != sequentialEnd_) + ++sequentialIt_; // make the next thread look at the next element + mutex_.unlock(); + + return tmp; + } + + // returns true if the last element was reached + bool isFinished(const EntityIterator& it) const + { return it == sequentialEnd_; } + + // make sure that the loop over the grid is finished + void setFinished() + { + mutex_.lock(); + sequentialIt_ = sequentialEnd_; + mutex_.unlock(); + } + + // prefix increment: goes to the next element which is not yet worked on by any + // thread + EntityIterator increment() + { + mutex_.lock(); + auto tmp = sequentialIt_; + if (sequentialIt_ != sequentialEnd_) + ++sequentialIt_; + mutex_.unlock(); + + return tmp; + } + +private: + GridView gridView_; + EntityIterator sequentialIt_; + EntityIterator sequentialEnd_; + + std::mutex mutex_; +}; +} // namespace Opm + +#endif diff --git a/opm/models/parallel/threadmanager.hh b/opm/models/parallel/threadmanager.hh new file mode 100644 index 000000000..ca7791612 --- /dev/null +++ b/opm/models/parallel/threadmanager.hh @@ -0,0 +1,134 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \copydoc Opm::ThreadManager + */ +#ifndef EWOMS_THREAD_MANAGER_HH +#define EWOMS_THREAD_MANAGER_HH + +#ifdef _OPENMP +#include +#endif + +#include +#include + +#include + +#include + +BEGIN_PROPERTIES + +NEW_PROP_TAG(ThreadsPerProcess); + +END_PROPERTIES + +namespace Opm { + +/*! + * \brief Simplifies multi-threaded capabilities. + */ +template +class ThreadManager +{ +public: + enum { +#if defined(_OPENMP) || DOXYGEN + //! Specify whether OpenMP is really available or not + isFake = false +#else + isFake = true +#endif + }; + + /*! + * \brief Register all run-time parameters of the thread manager. + */ + static void registerParameters() + { + EWOMS_REGISTER_PARAM(TypeTag, int, ThreadsPerProcess, + "The maximum number of threads to be instantiated per process " + "('-1' means 'automatic')"); + } + + static void init() + { + numThreads_ = EWOMS_GET_PARAM(TypeTag, int, ThreadsPerProcess); + + // some safety checks. This is pretty ugly macro-magic, but so what? +#if !defined(_OPENMP) + if (numThreads_ != 1 && numThreads_ != -1) + throw std::invalid_argument("OpenMP is not available. The only valid values for " + "threads-per-process is 1 and -1 but it is "+std::to_string(numThreads_)+"!"); + numThreads_ = 1; +#elif !defined NDEBUG && defined DUNE_INTERFACECHECK + if (numThreads_ != 1) + throw std::invalid_argument("You explicitly enabled Barton-Nackman interface checking in Dune. " + "The Dune implementation of this is currently incompatible with " + "thread parallelism!"); + numThreads_ = 1; +#else + + if (numThreads_ == 0) + throw std::invalid_argument("Zero threads per process are not possible: It must be at least 1, " + "(or -1 for 'automatic')!"); +#endif + +#ifdef _OPENMP + // actually limit the number of threads and get the number of threads which are + // used in the end. + if (numThreads_ > 0) + omp_set_num_threads(numThreads_); + + numThreads_ = omp_get_max_threads(); +#endif + } + + /*! + * \brief Return the maximum number of threads of the current process. + */ + static unsigned maxThreads() + { return static_cast(numThreads_); } + + /*! + * \brief Return the index of the current OpenMP thread + */ + static unsigned threadId() + { +#ifdef _OPENMP + return static_cast(omp_get_thread_num()); +#else + return 0; +#endif + } + +private: + static int numThreads_; +}; + +template +int ThreadManager::numThreads_ = 1; +} // namespace Opm + +#endif diff --git a/opm/simulators/linalg/blacklist.hh b/opm/simulators/linalg/blacklist.hh index ad9a9491e..11daefbe1 100644 --- a/opm/simulators/linalg/blacklist.hh +++ b/opm/simulators/linalg/blacklist.hh @@ -30,7 +30,7 @@ #include "overlaptypes.hh" #if HAVE_MPI -#include +#include #include #include diff --git a/opm/simulators/linalg/domesticoverlapfrombcrsmatrix.hh b/opm/simulators/linalg/domesticoverlapfrombcrsmatrix.hh index b6539784d..5683824bb 100644 --- a/opm/simulators/linalg/domesticoverlapfrombcrsmatrix.hh +++ b/opm/simulators/linalg/domesticoverlapfrombcrsmatrix.hh @@ -31,7 +31,7 @@ #include "blacklist.hh" #include "globalindices.hh" -#include +#include #include #include diff --git a/opm/simulators/linalg/foreignoverlapfrombcrsmatrix.hh b/opm/simulators/linalg/foreignoverlapfrombcrsmatrix.hh index 51b12579a..8733b9ede 100644 --- a/opm/simulators/linalg/foreignoverlapfrombcrsmatrix.hh +++ b/opm/simulators/linalg/foreignoverlapfrombcrsmatrix.hh @@ -30,7 +30,7 @@ #include "overlaptypes.hh" #include "blacklist.hh" -#include +#include #include diff --git a/opm/simulators/linalg/overlappingbcrsmatrix.hh b/opm/simulators/linalg/overlappingbcrsmatrix.hh index dbbb6ce3d..3ef3944ed 100644 --- a/opm/simulators/linalg/overlappingbcrsmatrix.hh +++ b/opm/simulators/linalg/overlappingbcrsmatrix.hh @@ -30,7 +30,7 @@ #include #include #include -#include +#include #include diff --git a/opm/simulators/linalg/overlappingblockvector.hh b/opm/simulators/linalg/overlappingblockvector.hh index 2bde77180..b215f44bd 100644 --- a/opm/simulators/linalg/overlappingblockvector.hh +++ b/opm/simulators/linalg/overlappingblockvector.hh @@ -29,7 +29,7 @@ #include "overlaptypes.hh" -#include +#include #include #include diff --git a/tests/models/test_tasklets.cpp b/tests/models/test_tasklets.cpp index d890d70cf..922ae8c0f 100644 --- a/tests/models/test_tasklets.cpp +++ b/tests/models/test_tasklets.cpp @@ -28,7 +28,7 @@ */ #include "config.h" -#include +#include #include #include