diff --git a/CMakeLists_files.cmake b/CMakeLists_files.cmake index bf893d36c..abe322fdc 100644 --- a/CMakeLists_files.cmake +++ b/CMakeLists_files.cmake @@ -62,6 +62,7 @@ list (APPEND MAIN_SOURCE_FILES opm/models/blackoil/blackoilpolymerparams.cpp opm/models/blackoil/blackoilsolventparams.cpp opm/models/parallel/mpiutil.cpp + opm/models/parallel/tasklets.cpp opm/models/parallel/threadmanager.cpp opm/simulators/flow/ActionHandler.cpp opm/simulators/flow/Banners.cpp diff --git a/opm/models/parallel/tasklets.cpp b/opm/models/parallel/tasklets.cpp new file mode 100644 index 000000000..390edacfd --- /dev/null +++ b/opm/models/parallel/tasklets.cpp @@ -0,0 +1,205 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace Opm { + +thread_local TaskletRunner* TaskletRunner::taskletRunner_ = nullptr; +thread_local int TaskletRunner::workerThreadIndex_ = -1; + +TaskletRunner::BarrierTasklet::BarrierTasklet(unsigned numWorkers) + : TaskletInterface(/*refCount=*/numWorkers) +{ + numWorkers_ = numWorkers; + numWaiting_ = 0; +} + +void TaskletRunner::BarrierTasklet::run() +{ + wait(); +} + +void TaskletRunner::BarrierTasklet::wait() +{ + std::unique_lock lock(barrierMutex_); + + numWaiting_ += 1; + if (numWaiting_ >= numWorkers_ + 1) { + lock.unlock(); + barrierCondition_.notify_all(); + } + else { + const auto& areAllWaiting = + [this]() -> bool + { return this->numWaiting_ >= this->numWorkers_ + 1; }; + + barrierCondition_.wait(lock, /*predicate=*/areAllWaiting); + } +} + +TaskletRunner::TaskletRunner(unsigned numWorkers) +{ + threads_.resize(numWorkers); + for (unsigned i = 0; i < numWorkers; ++i) + // create a worker thread + threads_[i].reset(new std::thread(startWorkerThread_, this, i)); +} + +TaskletRunner::~TaskletRunner() +{ + if (threads_.size() > 0) { + // dispatch a tasklet which will terminate the worker thread + dispatch(std::make_shared()); + + // wait until all worker threads have terminated + for (auto& thread : threads_) + thread->join(); + } +} + +bool TaskletRunner::failure() const +{ + return this->failureFlag_.load(std::memory_order_relaxed); +} + +int TaskletRunner::workerThreadIndex() const +{ + if (TaskletRunner::taskletRunner_ != this) + return -1; + return TaskletRunner::workerThreadIndex_; +} + +void TaskletRunner::dispatch(std::shared_ptr tasklet) +{ + if (threads_.empty()) { + // run the tasklet immediately in synchronous mode. + while (tasklet->referenceCount() > 0) { + tasklet->dereference(); + try { + tasklet->run(); + } + catch (const std::exception& e) { + std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + catch (...) { + std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + } + } + else { + // lock mutex for the tasklet queue to make sure that nobody messes with the + // task queue + taskletQueueMutex_.lock(); + + // add the tasklet to the queue + taskletQueue_.push(tasklet); + + taskletQueueMutex_.unlock(); + + workAvailableCondition_.notify_all(); + } +} + +void TaskletRunner::barrier() +{ + unsigned numWorkers = threads_.size(); + if (numWorkers == 0) + // nothing needs to be done to implement a barrier in synchronous mode + return; + + // dispatch a barrier tasklet and wait until it has been run by the worker thread + auto barrierTasklet = std::make_shared(numWorkers); + dispatch(barrierTasklet); + + barrierTasklet->wait(); +} + +void TaskletRunner::startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex) +{ + TaskletRunner::taskletRunner_ = taskletRunner; + TaskletRunner::workerThreadIndex_ = workerThreadIndex; + + taskletRunner->run_(); +} + +void TaskletRunner::run_() +{ + while (true) { + + // wait until tasklets have been pushed to the queue. first we need to lock + // mutex for access to taskletQueue_ + std::unique_lock lock(taskletQueueMutex_); + + const auto& workIsAvailable = + [this]() -> bool + { return !taskletQueue_.empty(); }; + + if (!workIsAvailable()) + workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable); + + // remove tasklet from queue + std::shared_ptr tasklet = taskletQueue_.front(); + + // if tasklet is an end marker, terminate the thread and DO NOT remove the + // tasklet. + if (tasklet->isEndMarker()) { + if(taskletQueue_.size() > 1) + throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); + taskletQueueMutex_.unlock(); + return; + } + + tasklet->dereference(); + if (tasklet->referenceCount() == 0) + // remove tasklets from the queue as soon as their reference count + // reaches zero, i.e. the tasklet has been run often enough. + taskletQueue_.pop(); + lock.unlock(); + + // execute tasklet + try { + tasklet->run(); + } + catch (const std::exception& e) { + std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + catch (...) { + std::cerr << "ERROR: Uncaught exception when running tasklet.\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + } +} + +} // end namespace Opm diff --git a/opm/models/parallel/tasklets.hpp b/opm/models/parallel/tasklets.hpp index 0e783c45b..8a952a327 100644 --- a/opm/models/parallel/tasklets.hpp +++ b/opm/models/parallel/tasklets.hpp @@ -28,12 +28,9 @@ #define OPM_TASKLETS_HPP #include -#include -#include #include #include #include -#include #include namespace Opm { @@ -83,25 +80,6 @@ private: const Fn& fn_; }; -class TaskletRunner; - -// this class stores the thread local static attributes for the TaskletRunner class. we -// cannot put them directly into TaskletRunner because defining static members for -// non-template classes in headers leads the linker to choke in case multiple compile -// units are used. -template -struct TaskletRunnerHelper_ -{ - static thread_local TaskletRunner* taskletRunner_; - static thread_local int workerThreadIndex_; -}; - -template -thread_local TaskletRunner* TaskletRunnerHelper_::taskletRunner_ = nullptr; - -template -thread_local int TaskletRunnerHelper_::workerThreadIndex_ = -1; - /*! * \brief Handles where a given tasklet is run. * @@ -114,33 +92,11 @@ class TaskletRunner class BarrierTasklet : public TaskletInterface { public: - BarrierTasklet(unsigned numWorkers) - : TaskletInterface(/*refCount=*/numWorkers) - { - numWorkers_ = numWorkers; - numWaiting_ = 0; - } + BarrierTasklet(unsigned numWorkers); - void run() - { wait(); } + void run(); - void wait() - { - std::unique_lock lock(barrierMutex_); - - numWaiting_ += 1; - if (numWaiting_ >= numWorkers_ + 1) { - lock.unlock(); - barrierCondition_.notify_all(); - } - else { - const auto& areAllWaiting = - [this]() -> bool - { return this->numWaiting_ >= this->numWorkers_ + 1; }; - - barrierCondition_.wait(lock, /*predicate=*/areAllWaiting); - } - } + void wait(); private: unsigned numWorkers_; @@ -172,13 +128,7 @@ public: * The number of worker threads may be 0. In this case, all work is done by the main * thread (synchronous mode). */ - TaskletRunner(unsigned numWorkers) - { - threads_.resize(numWorkers); - for (unsigned i = 0; i < numWorkers; ++i) - // create a worker thread - threads_[i].reset(new std::thread(startWorkerThread_, this, i)); - } + TaskletRunner(unsigned numWorkers); /*! * \brief Destructor @@ -187,34 +137,16 @@ public: * worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to * be completed. */ - ~TaskletRunner() - { - if (threads_.size() > 0) { - // dispatch a tasklet which will terminate the worker thread - dispatch(std::make_shared()); + ~TaskletRunner(); - // wait until all worker threads have terminated - for (auto& thread : threads_) - thread->join(); - } - } - - bool failure() const - { - return this->failureFlag_.load(std::memory_order_relaxed); - } + bool failure() const; /*! * \brief Returns the index of the current worker thread. * * If the current thread is not a worker thread, -1 is returned. */ - int workerThreadIndex() const - { - if (TaskletRunnerHelper_::taskletRunner_ != this) - return -1; - return TaskletRunnerHelper_::workerThreadIndex_; - } + int workerThreadIndex() const; /*! * \brief Returns the number of worker threads for the tasklet runner. @@ -227,38 +159,7 @@ public: * * The tasklet is either run immediately or deferred to a separate thread. */ - void dispatch(std::shared_ptr tasklet) - { - if (threads_.empty()) { - // run the tasklet immediately in synchronous mode. - while (tasklet->referenceCount() > 0) { - tasklet->dereference(); - try { - tasklet->run(); - } - catch (const std::exception& e) { - std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - catch (...) { - std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - } - } - else { - // lock mutex for the tasklet queue to make sure that nobody messes with the - // task queue - taskletQueueMutex_.lock(); - - // add the tasklet to the queue - taskletQueue_.push(tasklet); - - taskletQueueMutex_.unlock(); - - workAvailableCondition_.notify_all(); - } - } + void dispatch(std::shared_ptr tasklet); /*! * \brief Convenience method to construct a new function runner tasklet and dispatch it immediately. @@ -275,19 +176,8 @@ public: /*! * \brief Make sure that all tasklets have been completed after this method has been called */ - void barrier() - { - unsigned numWorkers = threads_.size(); - if (numWorkers == 0) - // nothing needs to be done to implement a barrier in synchronous mode - return; + void barrier(); - // dispatch a barrier tasklet and wait until it has been run by the worker thread - auto barrierTasklet = std::make_shared(numWorkers); - dispatch(barrierTasklet); - - barrierTasklet->wait(); - } private: // Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails. // This flag is checked before new tasklets run or get dispatched and in case it is true, the @@ -301,68 +191,18 @@ private: protected: // main function of the worker thread - static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex) - { - TaskletRunnerHelper_::taskletRunner_ = taskletRunner; - TaskletRunnerHelper_::workerThreadIndex_ = workerThreadIndex; - - taskletRunner->run_(); - } + static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex); //! do the work until the queue received an end tasklet - void run_() - { - while (true) { - - // wait until tasklets have been pushed to the queue. first we need to lock - // mutex for access to taskletQueue_ - std::unique_lock lock(taskletQueueMutex_); - - const auto& workIsAvailable = - [this]() -> bool - { return !taskletQueue_.empty(); }; - - if (!workIsAvailable()) - workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable); - - // remove tasklet from queue - std::shared_ptr tasklet = taskletQueue_.front(); - - // if tasklet is an end marker, terminate the thread and DO NOT remove the - // tasklet. - if (tasklet->isEndMarker()) { - if(taskletQueue_.size() > 1) - throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); - taskletQueueMutex_.unlock(); - return; - } - - tasklet->dereference(); - if (tasklet->referenceCount() == 0) - // remove tasklets from the queue as soon as their reference count - // reaches zero, i.e. the tasklet has been run often enough. - taskletQueue_.pop(); - lock.unlock(); - - // execute tasklet - try { - tasklet->run(); - } - catch (const std::exception& e) { - std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - catch (...) { - std::cerr << "ERROR: Uncaught exception when running tasklet.\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - } - } + void run_(); std::vector > threads_; std::queue > taskletQueue_; std::mutex taskletQueueMutex_; std::condition_variable workAvailableCondition_; + + static thread_local TaskletRunner* taskletRunner_; + static thread_local int workerThreadIndex_; }; } // end namespace Opm diff --git a/tests/models/test_tasklets.cpp b/tests/models/test_tasklets.cpp index a95db6eba..838f7fccc 100644 --- a/tests/models/test_tasklets.cpp +++ b/tests/models/test_tasklets.cpp @@ -30,6 +30,7 @@ #include +#include #include #include