diff --git a/CMakeLists.txt b/CMakeLists.txt index bbd7374a4..c5526517d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -407,6 +407,7 @@ if(QuadMath_FOUND) co2injection_flash_ecfv co2injection_flash_vcfv) opm_add_test(${tapp}_quad + LIBRARIES opmsimulators opmcommon EXE_NAME ${tapp}_quad SOURCES examples/${tapp}.cpp diff --git a/CMakeLists_files.cmake b/CMakeLists_files.cmake index 01ace7fcf..3003961d7 100644 --- a/CMakeLists_files.cmake +++ b/CMakeLists_files.cmake @@ -61,6 +61,10 @@ list (APPEND MAIN_SOURCE_FILES opm/models/blackoil/blackoilmicpparams.cpp opm/models/blackoil/blackoilpolymerparams.cpp opm/models/blackoil/blackoilsolventparams.cpp + opm/models/parallel/mpiutil.cpp + opm/models/parallel/tasklets.cpp + opm/models/parallel/threadmanager.cpp + opm/models/utils/timer.cpp opm/simulators/flow/ActionHandler.cpp opm/simulators/flow/Banners.cpp opm/simulators/flow/BlackoilModelParameters.cpp @@ -668,10 +672,10 @@ list (APPEND PUBLIC_HEADER_FILES opm/models/nonlinear/nullconvergencewriter.hh opm/models/parallel/gridcommhandles.hh opm/models/parallel/mpibuffer.hh - opm/models/parallel/mpiutil.hh - opm/models/parallel/tasklets.hh + opm/models/parallel/mpiutil.hpp + opm/models/parallel/tasklets.hpp opm/models/parallel/threadedentityiterator.hh - opm/models/parallel/threadmanager.hh + opm/models/parallel/threadmanager.hpp opm/models/ptflash/flashindices.hh opm/models/ptflash/flashintensivequantities.hh opm/models/ptflash/flashlocalresidual.hh @@ -711,7 +715,7 @@ list (APPEND PUBLIC_HEADER_FILES opm/models/utils/signum.hh opm/models/utils/simulator.hh opm/models/utils/start.hh - opm/models/utils/timer.hh + opm/models/utils/timer.hpp opm/models/utils/timerguard.hh opm/simulators/flow/ActionHandler.hpp opm/simulators/flow/AluGridCartesianIndexMapper.hpp diff --git a/opm/models/discretization/common/fvbasediscretization.hh b/opm/models/discretization/common/fvbasediscretization.hh index 2765164cf..cd0d3b5e4 100644 --- a/opm/models/discretization/common/fvbasediscretization.hh +++ b/opm/models/discretization/common/fvbasediscretization.hh @@ -56,11 +56,11 @@ #include #include -#include +#include #include #include -#include +#include #include #include @@ -219,7 +219,7 @@ struct ConstraintsContext */ template struct ThreadManager -{ using type = ::Opm::ThreadManager; }; +{ using type = ::Opm::ThreadManager; }; template struct UseLinearizationLock diff --git a/opm/models/discretization/common/fvbaselinearizer.hh b/opm/models/discretization/common/fvbaselinearizer.hh index c53035a46..f40d843fc 100644 --- a/opm/models/discretization/common/fvbaselinearizer.hh +++ b/opm/models/discretization/common/fvbaselinearizer.hh @@ -36,7 +36,7 @@ #include #include -#include +#include #include #include diff --git a/opm/models/io/vtkmultiwriter.hh b/opm/models/io/vtkmultiwriter.hh index 1a1827a0f..edf79d1bd 100644 --- a/opm/models/io/vtkmultiwriter.hh +++ b/opm/models/io/vtkmultiwriter.hh @@ -33,7 +33,7 @@ #include "vtktensorfunction.hh" #include -#include +#include #include diff --git a/opm/models/nonlinear/newtonmethod.hh b/opm/models/nonlinear/newtonmethod.hh index aa99ae354..744d90349 100644 --- a/opm/models/nonlinear/newtonmethod.hh +++ b/opm/models/nonlinear/newtonmethod.hh @@ -41,7 +41,7 @@ #include #include -#include +#include #include #include diff --git a/opm/models/parallel/mpiutil.cpp b/opm/models/parallel/mpiutil.cpp new file mode 100644 index 000000000..80fd6c4d8 --- /dev/null +++ b/opm/models/parallel/mpiutil.cpp @@ -0,0 +1,188 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +#include +#include + +#include +#include +#include +#include + +#if HAVE_MPI +#include +#include +#endif + +namespace { + +template +int packSize() +{ + int pack_size; + MPI_Pack_size(1, Dune::MPITraits::getType(), MPI_COMM_WORLD, &pack_size); + return pack_size; +} + +// -------- Packer -------- +template +struct Packer +{ + static int size(const T&) + { + return packSize(); + } + + static void pack(const T& content, std::vector& buf, int& offset) + { + MPI_Pack(&content, 1, Dune::MPITraits::getType(), buf.data(), buf.size(), &offset, MPI_COMM_WORLD); + } + + static T unpack(const std::vector& recv_buffer, int& offset) + { + T content; + auto* data = const_cast(recv_buffer.data()); + MPI_Unpack(data, recv_buffer.size(), &offset, &content, 1, Dune::MPITraits::getType(), MPI_COMM_WORLD); + return content; + } +}; + +// -------- Packer, string specialization -------- +template <> +struct Packer +{ + static int size(const std::string& content) + { + return packSize() + content.size()*packSize(); + } + + static void pack(const std::string& content, std::vector& buf, int& offset) + { + unsigned int size = content.size(); + Packer::pack(size, buf, offset); + if (size > 0) { + MPI_Pack(const_cast(content.c_str()), size, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD); + } + } + + static std::string unpack(const std::vector& recv_buffer, int& offset) + { + unsigned int size = Packer::unpack(recv_buffer, offset); + std::string text; + if (size > 0) { + auto* data = const_cast(recv_buffer.data()); + std::vector chars(size); + MPI_Unpack(data, recv_buffer.size(), &offset, chars.data(), size, MPI_CHAR, MPI_COMM_WORLD); + text = std::string(chars.data(), size); + } + return text; + } +}; + +// -------- Packer, vector partial specialization -------- +template +struct Packer> +{ + static int size(const std::string& content) + { + int sz = 0; + sz += packSize(); + for (const T& elem : content) { + sz += Packer::size(elem); + } + return sz; + } + + static void pack(const std::vector& content, std::vector& buf, int& offset) + { + unsigned int size = content.size(); + Packer::pack(size, buf, offset); + for (const T& elem : content) { + Packer::pack(elem); + } + } + + static std::vector unpack(const std::vector& recv_buffer, int& offset) + { + unsigned int size = Packer::unpack(recv_buffer, offset); + std::vector content; + content.reserve(size); + for (unsigned int i = 0; i < size; ++i) { + content.push_back(Packer::unpack(recv_buffer, offset)); + } + return content; + } +}; + +} // anonymous namespace + +namespace Opm { + +/// From each rank, gather its string (if not empty) into a vector. +std::vector gatherStrings(const std::string& local_string) +{ +#if HAVE_MPI + using StringPacker = Packer; + + // Pack local messages. + const int message_size = StringPacker::size(local_string); + std::vector buffer(message_size); + int offset = 0; + StringPacker::pack(local_string, buffer, offset); + assert(offset == message_size); + + // Get message sizes and create offset/displacement array for gathering. + int num_processes = -1; + MPI_Comm_size(MPI_COMM_WORLD, &num_processes); + std::vector message_sizes(num_processes); + MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD); + std::vector displ(num_processes + 1, 0); + std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1); + + // Gather. + std::vector recv_buffer(displ.back()); + MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED, + const_cast(recv_buffer.data()), message_sizes.data(), + displ.data(), MPI_PACKED, + MPI_COMM_WORLD); + + // Unpack and return. + std::vector ret; + for (int process = 0; process < num_processes; ++process) { + offset = displ[process]; + std::string s = StringPacker::unpack(recv_buffer, offset); + if (!s.empty()) { + ret.push_back(s); + } + assert(offset == displ[process + 1]); + } + return ret; +#else + if (local_string.empty()) { + return {}; + } else { + return { local_string }; + } +#endif +} + +} // namespace Opm diff --git a/opm/models/parallel/mpiutil.hh b/opm/models/parallel/mpiutil.hh deleted file mode 100644 index 6ccc1a12c..000000000 --- a/opm/models/parallel/mpiutil.hh +++ /dev/null @@ -1,211 +0,0 @@ -// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- -// vi: set et ts=4 sw=4 sts=4: -/* - This file is part of the Open Porous Media project (OPM). - - OPM is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 2 of the License, or - (at your option) any later version. - - OPM is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with OPM. If not, see . - - Consult the COPYING file in the top-level source directory of this - module for the precise wording of the license and the list of - copyright holders. -*/ -/*! - * \file - * \copydoc Opm::MpiBuffer - */ -#ifndef OPM_MATERIAL_MPIUTIL_HH -#define OPM_MATERIAL_MPIUTIL_HH - -#include - -#include -#include -#include -#include - - -#if HAVE_MPI - -#include - - - -namespace mpiutil_details -{ - - template - int packSize() - { - int pack_size; - MPI_Pack_size(1, Dune::MPITraits::getType(), MPI_COMM_WORLD, &pack_size); - return pack_size; - } - - // -------- Packer -------- - template - struct Packer - { - static int size(const T&) - { - return packSize(); - } - - static void pack(const T& content, std::vector& buf, int& offset) - { - MPI_Pack(&content, 1, Dune::MPITraits::getType(), buf.data(), buf.size(), &offset, MPI_COMM_WORLD); - } - - static T unpack(const std::vector& recv_buffer, int& offset) - { - T content; - auto* data = const_cast(recv_buffer.data()); - MPI_Unpack(data, recv_buffer.size(), &offset, &content, 1, Dune::MPITraits::getType(), MPI_COMM_WORLD); - return content; - } - }; - - // -------- Packer, string specialization -------- - template <> - struct Packer - { - static int size(const std::string& content) - { - return packSize() + content.size()*packSize(); - } - - static void pack(const std::string& content, std::vector& buf, int& offset) - { - unsigned int size = content.size(); - Packer::pack(size, buf, offset); - if (size > 0) { - MPI_Pack(const_cast(content.c_str()), size, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD); - } - } - - static std::string unpack(const std::vector& recv_buffer, int& offset) - { - unsigned int size = Packer::unpack(recv_buffer, offset); - std::string text; - if (size > 0) { - auto* data = const_cast(recv_buffer.data()); - std::vector chars(size); - MPI_Unpack(data, recv_buffer.size(), &offset, chars.data(), size, MPI_CHAR, MPI_COMM_WORLD); - text = std::string(chars.data(), size); - } - return text; - } - }; - - // -------- Packer, vector partial specialization -------- - template - struct Packer> - { - static int size(const std::string& content) - { - int sz = 0; - sz += packSize(); - for (const T& elem : content) { - sz += Packer::size(elem); - } - return sz; - } - - static void pack(const std::vector& content, std::vector& buf, int& offset) - { - unsigned int size = content.size(); - Packer::pack(size, buf, offset); - for (const T& elem : content) { - Packer::pack(elem); - } - } - - static std::vector unpack(const std::vector& recv_buffer, int& offset) - { - unsigned int size = Packer::unpack(recv_buffer, offset); - std::vector content; - content.reserve(size); - for (unsigned int i = 0; i < size; ++i) { - content.push_back(Packer::unpack(recv_buffer, offset)); - } - return content; - } - }; - - -} // anonymous namespace - - -namespace Opm -{ - - /// From each rank, gather its string (if not empty) into a vector. - inline std::vector gatherStrings(const std::string& local_string) - { - using StringPacker = mpiutil_details::Packer; - - // Pack local messages. - const int message_size = StringPacker::size(local_string); - std::vector buffer(message_size); - int offset = 0; - StringPacker::pack(local_string, buffer, offset); - assert(offset == message_size); - - // Get message sizes and create offset/displacement array for gathering. - int num_processes = -1; - MPI_Comm_size(MPI_COMM_WORLD, &num_processes); - std::vector message_sizes(num_processes); - MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD); - std::vector displ(num_processes + 1, 0); - std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1); - - // Gather. - std::vector recv_buffer(displ.back()); - MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED, - const_cast(recv_buffer.data()), message_sizes.data(), - displ.data(), MPI_PACKED, - MPI_COMM_WORLD); - - // Unpack and return. - std::vector ret; - for (int process = 0; process < num_processes; ++process) { - offset = displ[process]; - std::string s = StringPacker::unpack(recv_buffer, offset); - if (!s.empty()) { - ret.push_back(s); - } - assert(offset == displ[process + 1]); - } - return ret; - } - -} // namespace Opm - -#else // HAVE_MPI - -namespace Opm -{ - inline std::vector gatherStrings(const std::string& local_string) - { - if (local_string.empty()) { - return {}; - } else { - return { local_string }; - } - } -} // namespace Opm - -#endif // HAVE_MPI - -#endif // OPM_MATERIAL_MPIUTIL_HH - diff --git a/opm/models/parallel/mpiutil.hpp b/opm/models/parallel/mpiutil.hpp new file mode 100644 index 000000000..fdd3825a3 --- /dev/null +++ b/opm/models/parallel/mpiutil.hpp @@ -0,0 +1,40 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \copydoc Opm::MpiBuffer + */ +#ifndef OPM_MPIUTIL_HPP +#define OPM_MPIUTIL_HPP + +#include +#include + +namespace Opm { + +std::vector gatherStrings(const std::string& local_string); + +} // namespace Opm + +#endif // OPM_MPIUTIL_HPP + diff --git a/opm/models/parallel/tasklets.cpp b/opm/models/parallel/tasklets.cpp new file mode 100644 index 000000000..390edacfd --- /dev/null +++ b/opm/models/parallel/tasklets.cpp @@ -0,0 +1,205 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace Opm { + +thread_local TaskletRunner* TaskletRunner::taskletRunner_ = nullptr; +thread_local int TaskletRunner::workerThreadIndex_ = -1; + +TaskletRunner::BarrierTasklet::BarrierTasklet(unsigned numWorkers) + : TaskletInterface(/*refCount=*/numWorkers) +{ + numWorkers_ = numWorkers; + numWaiting_ = 0; +} + +void TaskletRunner::BarrierTasklet::run() +{ + wait(); +} + +void TaskletRunner::BarrierTasklet::wait() +{ + std::unique_lock lock(barrierMutex_); + + numWaiting_ += 1; + if (numWaiting_ >= numWorkers_ + 1) { + lock.unlock(); + barrierCondition_.notify_all(); + } + else { + const auto& areAllWaiting = + [this]() -> bool + { return this->numWaiting_ >= this->numWorkers_ + 1; }; + + barrierCondition_.wait(lock, /*predicate=*/areAllWaiting); + } +} + +TaskletRunner::TaskletRunner(unsigned numWorkers) +{ + threads_.resize(numWorkers); + for (unsigned i = 0; i < numWorkers; ++i) + // create a worker thread + threads_[i].reset(new std::thread(startWorkerThread_, this, i)); +} + +TaskletRunner::~TaskletRunner() +{ + if (threads_.size() > 0) { + // dispatch a tasklet which will terminate the worker thread + dispatch(std::make_shared()); + + // wait until all worker threads have terminated + for (auto& thread : threads_) + thread->join(); + } +} + +bool TaskletRunner::failure() const +{ + return this->failureFlag_.load(std::memory_order_relaxed); +} + +int TaskletRunner::workerThreadIndex() const +{ + if (TaskletRunner::taskletRunner_ != this) + return -1; + return TaskletRunner::workerThreadIndex_; +} + +void TaskletRunner::dispatch(std::shared_ptr tasklet) +{ + if (threads_.empty()) { + // run the tasklet immediately in synchronous mode. + while (tasklet->referenceCount() > 0) { + tasklet->dereference(); + try { + tasklet->run(); + } + catch (const std::exception& e) { + std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + catch (...) { + std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + } + } + else { + // lock mutex for the tasklet queue to make sure that nobody messes with the + // task queue + taskletQueueMutex_.lock(); + + // add the tasklet to the queue + taskletQueue_.push(tasklet); + + taskletQueueMutex_.unlock(); + + workAvailableCondition_.notify_all(); + } +} + +void TaskletRunner::barrier() +{ + unsigned numWorkers = threads_.size(); + if (numWorkers == 0) + // nothing needs to be done to implement a barrier in synchronous mode + return; + + // dispatch a barrier tasklet and wait until it has been run by the worker thread + auto barrierTasklet = std::make_shared(numWorkers); + dispatch(barrierTasklet); + + barrierTasklet->wait(); +} + +void TaskletRunner::startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex) +{ + TaskletRunner::taskletRunner_ = taskletRunner; + TaskletRunner::workerThreadIndex_ = workerThreadIndex; + + taskletRunner->run_(); +} + +void TaskletRunner::run_() +{ + while (true) { + + // wait until tasklets have been pushed to the queue. first we need to lock + // mutex for access to taskletQueue_ + std::unique_lock lock(taskletQueueMutex_); + + const auto& workIsAvailable = + [this]() -> bool + { return !taskletQueue_.empty(); }; + + if (!workIsAvailable()) + workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable); + + // remove tasklet from queue + std::shared_ptr tasklet = taskletQueue_.front(); + + // if tasklet is an end marker, terminate the thread and DO NOT remove the + // tasklet. + if (tasklet->isEndMarker()) { + if(taskletQueue_.size() > 1) + throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); + taskletQueueMutex_.unlock(); + return; + } + + tasklet->dereference(); + if (tasklet->referenceCount() == 0) + // remove tasklets from the queue as soon as their reference count + // reaches zero, i.e. the tasklet has been run often enough. + taskletQueue_.pop(); + lock.unlock(); + + // execute tasklet + try { + tasklet->run(); + } + catch (const std::exception& e) { + std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + catch (...) { + std::cerr << "ERROR: Uncaught exception when running tasklet.\n"; + failureFlag_.store(true, std::memory_order_relaxed); + } + } +} + +} // end namespace Opm diff --git a/opm/models/parallel/tasklets.hh b/opm/models/parallel/tasklets.hh deleted file mode 100644 index ad3508d3c..000000000 --- a/opm/models/parallel/tasklets.hh +++ /dev/null @@ -1,369 +0,0 @@ -// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- -// vi: set et ts=4 sw=4 sts=4: -/* - This file is part of the Open Porous Media project (OPM). - - OPM is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 2 of the License, or - (at your option) any later version. - - OPM is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with OPM. If not, see . - - Consult the COPYING file in the top-level source directory of this - module for the precise wording of the license and the list of - copyright holders. -*/ -/*! - * \file - * \brief Provides a mechanism to dispatch work to separate threads - */ -#ifndef EWOMS_TASKLETS_HH -#define EWOMS_TASKLETS_HH - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace Opm { - -/*! - * \brief The base class for tasklets. - * - * Tasklets are a generic mechanism for potentially running work in a separate thread. - */ -class TaskletInterface -{ -public: - TaskletInterface(int refCount = 1) - : referenceCount_(refCount) - {} - virtual ~TaskletInterface() {} - virtual void run() = 0; - virtual bool isEndMarker () const { return false; } - - void dereference() - { -- referenceCount_; } - - int referenceCount() const - { return referenceCount_; } - -private: - int referenceCount_; -}; - -/*! - * \brief A simple tasklet that runs a function that returns void and does not take any - * arguments a given number of times. - */ -template -class FunctionRunnerTasklet : public TaskletInterface -{ -public: - FunctionRunnerTasklet(const FunctionRunnerTasklet&) = default; - FunctionRunnerTasklet(int numInvocations, const Fn& fn) - : TaskletInterface(numInvocations) - , fn_(fn) - {} - void run() override - { fn_(); } - -private: - const Fn& fn_; -}; - -class TaskletRunner; - -// this class stores the thread local static attributes for the TaskletRunner class. we -// cannot put them directly into TaskletRunner because defining static members for -// non-template classes in headers leads the linker to choke in case multiple compile -// units are used. -template -struct TaskletRunnerHelper_ -{ - static thread_local TaskletRunner* taskletRunner_; - static thread_local int workerThreadIndex_; -}; - -template -thread_local TaskletRunner* TaskletRunnerHelper_::taskletRunner_ = nullptr; - -template -thread_local int TaskletRunnerHelper_::workerThreadIndex_ = -1; - -/*! - * \brief Handles where a given tasklet is run. - * - * Depending on the number of worker threads, a tasklet can either be run in a separate - * worker thread or by the main thread. - */ -class TaskletRunner -{ - /// \brief Implements a barrier. This class can only be used in the asynchronous case. - class BarrierTasklet : public TaskletInterface - { - public: - BarrierTasklet(unsigned numWorkers) - : TaskletInterface(/*refCount=*/numWorkers) - { - numWorkers_ = numWorkers; - numWaiting_ = 0; - } - - void run() - { wait(); } - - void wait() - { - std::unique_lock lock(barrierMutex_); - - numWaiting_ += 1; - if (numWaiting_ >= numWorkers_ + 1) { - lock.unlock(); - barrierCondition_.notify_all(); - } - else { - const auto& areAllWaiting = - [this]() -> bool - { return this->numWaiting_ >= this->numWorkers_ + 1; }; - - barrierCondition_.wait(lock, /*predicate=*/areAllWaiting); - } - } - - private: - unsigned numWorkers_; - unsigned numWaiting_; - - std::condition_variable barrierCondition_; - std::mutex barrierMutex_; - }; - - /// \brief TerminateThreadTasklet class - /// Empty tasklet marking thread termination. - class TerminateThreadTasklet : public TaskletInterface - { - public: - void run() - { } - - bool isEndMarker() const - { return true; } - }; - -public: - // prohibit copying of tasklet runners - TaskletRunner(const TaskletRunner&) = delete; - - /*! - * \brief Creates a tasklet runner with numWorkers underling threads for doing work. - * - * The number of worker threads may be 0. In this case, all work is done by the main - * thread (synchronous mode). - */ - TaskletRunner(unsigned numWorkers) - { - threads_.resize(numWorkers); - for (unsigned i = 0; i < numWorkers; ++i) - // create a worker thread - threads_[i].reset(new std::thread(startWorkerThread_, this, i)); - } - - /*! - * \brief Destructor - * - * If worker threads were created to run the tasklets, this method waits until all - * worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to - * be completed. - */ - ~TaskletRunner() - { - if (threads_.size() > 0) { - // dispatch a tasklet which will terminate the worker thread - dispatch(std::make_shared()); - - // wait until all worker threads have terminated - for (auto& thread : threads_) - thread->join(); - } - } - - bool failure() const - { - return this->failureFlag_.load(std::memory_order_relaxed); - } - - /*! - * \brief Returns the index of the current worker thread. - * - * If the current thread is not a worker thread, -1 is returned. - */ - int workerThreadIndex() const - { - if (TaskletRunnerHelper_::taskletRunner_ != this) - return -1; - return TaskletRunnerHelper_::workerThreadIndex_; - } - - /*! - * \brief Returns the number of worker threads for the tasklet runner. - */ - int numWorkerThreads() const - { return threads_.size(); } - - /*! - * \brief Add a new tasklet. - * - * The tasklet is either run immediately or deferred to a separate thread. - */ - void dispatch(std::shared_ptr tasklet) - { - if (threads_.empty()) { - // run the tasklet immediately in synchronous mode. - while (tasklet->referenceCount() > 0) { - tasklet->dereference(); - try { - tasklet->run(); - } - catch (const std::exception& e) { - std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - catch (...) { - std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - } - } - else { - // lock mutex for the tasklet queue to make sure that nobody messes with the - // task queue - taskletQueueMutex_.lock(); - - // add the tasklet to the queue - taskletQueue_.push(tasklet); - - taskletQueueMutex_.unlock(); - - workAvailableCondition_.notify_all(); - } - } - - /*! - * \brief Convenience method to construct a new function runner tasklet and dispatch it immediately. - */ - template - std::shared_ptr > dispatchFunction(Fn &fn, int numInvocations=1) - { - using Tasklet = FunctionRunnerTasklet; - auto tasklet = std::make_shared(numInvocations, fn); - this->dispatch(tasklet); - return tasklet; - } - - /*! - * \brief Make sure that all tasklets have been completed after this method has been called - */ - void barrier() - { - unsigned numWorkers = threads_.size(); - if (numWorkers == 0) - // nothing needs to be done to implement a barrier in synchronous mode - return; - - // dispatch a barrier tasklet and wait until it has been run by the worker thread - auto barrierTasklet = std::make_shared(numWorkers); - dispatch(barrierTasklet); - - barrierTasklet->wait(); - } -private: - // Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails. - // This flag is checked before new tasklets run or get dispatched and in case it is true, the - // thread execution will be stopped / no new tasklets will be started and the program will abort. - // To set the flag and load the flag, we use std::memory_order_relaxed. - // Atomic operations tagged memory_order_relaxed are not synchronization operations; they do not - // impose an order among concurrent memory accesses. They guarantee atomicity and modification order - // consistency. This is the right choice for the setting here, since it is enough to broadcast failure - // before new run or get dispatched. - std::atomic failureFlag_ = false; - -protected: - // main function of the worker thread - static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex) - { - TaskletRunnerHelper_::taskletRunner_ = taskletRunner; - TaskletRunnerHelper_::workerThreadIndex_ = workerThreadIndex; - - taskletRunner->run_(); - } - - //! do the work until the queue received an end tasklet - void run_() - { - while (true) { - - // wait until tasklets have been pushed to the queue. first we need to lock - // mutex for access to taskletQueue_ - std::unique_lock lock(taskletQueueMutex_); - - const auto& workIsAvailable = - [this]() -> bool - { return !taskletQueue_.empty(); }; - - if (!workIsAvailable()) - workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable); - - // remove tasklet from queue - std::shared_ptr tasklet = taskletQueue_.front(); - - // if tasklet is an end marker, terminate the thread and DO NOT remove the - // tasklet. - if (tasklet->isEndMarker()) { - if(taskletQueue_.size() > 1) - throw std::logic_error("TaskletRunner: Not all queued tasklets were executed"); - taskletQueueMutex_.unlock(); - return; - } - - tasklet->dereference(); - if (tasklet->referenceCount() == 0) - // remove tasklets from the queue as soon as their reference count - // reaches zero, i.e. the tasklet has been run often enough. - taskletQueue_.pop(); - lock.unlock(); - - // execute tasklet - try { - tasklet->run(); - } - catch (const std::exception& e) { - std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - catch (...) { - std::cerr << "ERROR: Uncaught exception when running tasklet.\n"; - failureFlag_.store(true, std::memory_order_relaxed); - } - } - } - - std::vector > threads_; - std::queue > taskletQueue_; - std::mutex taskletQueueMutex_; - std::condition_variable workAvailableCondition_; -}; - -} // end namespace Opm -#endif diff --git a/opm/models/parallel/tasklets.hpp b/opm/models/parallel/tasklets.hpp new file mode 100644 index 000000000..8a952a327 --- /dev/null +++ b/opm/models/parallel/tasklets.hpp @@ -0,0 +1,210 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \brief Provides a mechanism to dispatch work to separate threads + */ +#ifndef OPM_TASKLETS_HPP +#define OPM_TASKLETS_HPP + +#include +#include +#include +#include +#include + +namespace Opm { + +/*! + * \brief The base class for tasklets. + * + * Tasklets are a generic mechanism for potentially running work in a separate thread. + */ +class TaskletInterface +{ +public: + TaskletInterface(int refCount = 1) + : referenceCount_(refCount) + {} + virtual ~TaskletInterface() {} + virtual void run() = 0; + virtual bool isEndMarker () const { return false; } + + void dereference() + { -- referenceCount_; } + + int referenceCount() const + { return referenceCount_; } + +private: + int referenceCount_; +}; + +/*! + * \brief A simple tasklet that runs a function that returns void and does not take any + * arguments a given number of times. + */ +template +class FunctionRunnerTasklet : public TaskletInterface +{ +public: + FunctionRunnerTasklet(const FunctionRunnerTasklet&) = default; + FunctionRunnerTasklet(int numInvocations, const Fn& fn) + : TaskletInterface(numInvocations) + , fn_(fn) + {} + void run() override + { fn_(); } + +private: + const Fn& fn_; +}; + +/*! + * \brief Handles where a given tasklet is run. + * + * Depending on the number of worker threads, a tasklet can either be run in a separate + * worker thread or by the main thread. + */ +class TaskletRunner +{ + /// \brief Implements a barrier. This class can only be used in the asynchronous case. + class BarrierTasklet : public TaskletInterface + { + public: + BarrierTasklet(unsigned numWorkers); + + void run(); + + void wait(); + + private: + unsigned numWorkers_; + unsigned numWaiting_; + + std::condition_variable barrierCondition_; + std::mutex barrierMutex_; + }; + + /// \brief TerminateThreadTasklet class + /// Empty tasklet marking thread termination. + class TerminateThreadTasklet : public TaskletInterface + { + public: + void run() + { } + + bool isEndMarker() const + { return true; } + }; + +public: + // prohibit copying of tasklet runners + TaskletRunner(const TaskletRunner&) = delete; + + /*! + * \brief Creates a tasklet runner with numWorkers underling threads for doing work. + * + * The number of worker threads may be 0. In this case, all work is done by the main + * thread (synchronous mode). + */ + TaskletRunner(unsigned numWorkers); + + /*! + * \brief Destructor + * + * If worker threads were created to run the tasklets, this method waits until all + * worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to + * be completed. + */ + ~TaskletRunner(); + + bool failure() const; + + /*! + * \brief Returns the index of the current worker thread. + * + * If the current thread is not a worker thread, -1 is returned. + */ + int workerThreadIndex() const; + + /*! + * \brief Returns the number of worker threads for the tasklet runner. + */ + int numWorkerThreads() const + { return threads_.size(); } + + /*! + * \brief Add a new tasklet. + * + * The tasklet is either run immediately or deferred to a separate thread. + */ + void dispatch(std::shared_ptr tasklet); + + /*! + * \brief Convenience method to construct a new function runner tasklet and dispatch it immediately. + */ + template + std::shared_ptr > dispatchFunction(Fn &fn, int numInvocations=1) + { + using Tasklet = FunctionRunnerTasklet; + auto tasklet = std::make_shared(numInvocations, fn); + this->dispatch(tasklet); + return tasklet; + } + + /*! + * \brief Make sure that all tasklets have been completed after this method has been called + */ + void barrier(); + +private: + // Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails. + // This flag is checked before new tasklets run or get dispatched and in case it is true, the + // thread execution will be stopped / no new tasklets will be started and the program will abort. + // To set the flag and load the flag, we use std::memory_order_relaxed. + // Atomic operations tagged memory_order_relaxed are not synchronization operations; they do not + // impose an order among concurrent memory accesses. They guarantee atomicity and modification order + // consistency. This is the right choice for the setting here, since it is enough to broadcast failure + // before new run or get dispatched. + std::atomic failureFlag_ = false; + +protected: + // main function of the worker thread + static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex); + + //! do the work until the queue received an end tasklet + void run_(); + + std::vector > threads_; + std::queue > taskletQueue_; + std::mutex taskletQueueMutex_; + std::condition_variable workAvailableCondition_; + + static thread_local TaskletRunner* taskletRunner_; + static thread_local int workerThreadIndex_; +}; + +} // end namespace Opm + +#endif // OPM_TASKLETS_HPP diff --git a/opm/models/parallel/threadmanager.cpp b/opm/models/parallel/threadmanager.cpp new file mode 100644 index 000000000..f9a77afd2 --- /dev/null +++ b/opm/models/parallel/threadmanager.cpp @@ -0,0 +1,93 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +#include +#include + +#ifdef _OPENMP +#include +#endif + +#include +#include + +namespace Opm { + +int ThreadManager::numThreads_ = 1; + +void ThreadManager::registerParameters() +{ + Parameters::Register + ("The maximum number of threads to be instantiated per process " + "('-1' means 'automatic')"); +} + +void ThreadManager::init(bool queryCommandLineParameter) +{ + if (queryCommandLineParameter) { + numThreads_ = Parameters::Get(); + + // some safety checks. This is pretty ugly macro-magic, but so what? +#if !defined(_OPENMP) + if (numThreads_ != 1 && numThreads_ != -1) { + throw std::invalid_argument("OpenMP is not available. The only valid values for " + "threads-per-process is 1 and -1 but it is "+std::to_string(numThreads_)+"!"); + } + numThreads_ = 1; +#elif !defined NDEBUG && defined DUNE_INTERFACECHECK + if (numThreads_ != 1) { + throw std::invalid_argument("You explicitly enabled Barton-Nackman interface checking in Dune. " + "The Dune implementation of this is currently incompatible with " + "thread parallelism!"); + } + numThreads_ = 1; +#else + if (numThreads_ == 0) { + throw std::invalid_argument("Zero threads per process are not possible: It must be at least 1, " + "(or -1 for 'automatic')!"); + } +#endif + +#ifdef _OPENMP + // actually limit the number of threads + if (numThreads_ > 0) { + omp_set_num_threads(numThreads_); + } +#endif + } + +#ifdef _OPENMP + // get the number of threads which are used in the end. + numThreads_ = omp_get_max_threads(); +#endif +} + +unsigned ThreadManager::threadId() +{ +#ifdef _OPENMP + return static_cast(omp_get_thread_num()); +#else + return 0; +#endif +} + +} // namespace Opm diff --git a/opm/models/parallel/threadmanager.hh b/opm/models/parallel/threadmanager.hh deleted file mode 100644 index 2aa83d8fc..000000000 --- a/opm/models/parallel/threadmanager.hh +++ /dev/null @@ -1,141 +0,0 @@ -// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- -// vi: set et ts=4 sw=4 sts=4: -/* - This file is part of the Open Porous Media project (OPM). - - OPM is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 2 of the License, or - (at your option) any later version. - - OPM is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with OPM. If not, see . - - Consult the COPYING file in the top-level source directory of this - module for the precise wording of the license and the list of - copyright holders. -*/ -/*! - * \file - * \copydoc Opm::ThreadManager - */ -#ifndef EWOMS_THREAD_MANAGER_HH -#define EWOMS_THREAD_MANAGER_HH - -#ifdef _OPENMP -#include -#endif - -#include -#include -#include - -#include - -namespace Opm { - -/*! - * \brief Simplifies multi-threaded capabilities. - */ -template -class ThreadManager -{ -public: - enum { -#if defined(_OPENMP) || DOXYGEN - //! Specify whether OpenMP is really available or not - isFake = false -#else - isFake = true -#endif - }; - - /*! - * \brief Register all run-time parameters of the thread manager. - */ - static void registerParameters() - { - Parameters::Register - ("The maximum number of threads to be instantiated per process " - "('-1' means 'automatic')"); - } - - /*! - * \brief Initialize number of threads used thread manager. - * - * \param queryCommandLineParameter if set to true we will query ThreadsPerProcess - * and if set (disregard the environment variable OPM_NUM_THREADS). - * If false we will assume that the number of OpenMP threads is already set - * outside of this function (e.g. by OPM_NUM_THREADS or in the simulator by - * the ThreadsPerProcess parameter). - */ - static void init(bool queryCommandLineParameter = true) - { - if (queryCommandLineParameter) - { - numThreads_ = Parameters::Get(); - - // some safety checks. This is pretty ugly macro-magic, but so what? -#if !defined(_OPENMP) - if (numThreads_ != 1 && numThreads_ != -1) - throw std::invalid_argument("OpenMP is not available. The only valid values for " - "threads-per-process is 1 and -1 but it is "+std::to_string(numThreads_)+"!"); - numThreads_ = 1; -#elif !defined NDEBUG && defined DUNE_INTERFACECHECK - if (numThreads_ != 1) - throw std::invalid_argument("You explicitly enabled Barton-Nackman interface checking in Dune. " - "The Dune implementation of this is currently incompatible with " - "thread parallelism!"); - numThreads_ = 1; -#else - - if (numThreads_ == 0) - throw std::invalid_argument("Zero threads per process are not possible: It must be at least 1, " - "(or -1 for 'automatic')!"); -#endif - -#ifdef _OPENMP - // actually limit the number of threads - if (numThreads_ > 0) - omp_set_num_threads(numThreads_); -#endif - } - -#ifdef _OPENMP - // get the number of threads which are used in the end. - numThreads_ = omp_get_max_threads(); -#endif - } - - /*! - * \brief Return the maximum number of threads of the current process. - */ - static unsigned maxThreads() - { return static_cast(numThreads_); } - - /*! - * \brief Return the index of the current OpenMP thread - */ - static unsigned threadId() - { -#ifdef _OPENMP - return static_cast(omp_get_thread_num()); -#else - return 0; -#endif - } - -private: - static int numThreads_; -}; - -template -int ThreadManager::numThreads_ = 1; -} // namespace Opm - -#endif diff --git a/opm/models/parallel/threadmanager.hpp b/opm/models/parallel/threadmanager.hpp new file mode 100644 index 000000000..8b12345d8 --- /dev/null +++ b/opm/models/parallel/threadmanager.hpp @@ -0,0 +1,80 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * \copydoc Opm::ThreadManager + */ +#ifndef OPM_THREAD_MANAGER_HPP +#define OPM_THREAD_MANAGER_HPP + +namespace Opm { + +/*! + * \brief Simplifies multi-threaded capabilities. + */ +class ThreadManager +{ +public: + enum { +#if defined(_OPENMP) || DOXYGEN + //! Specify whether OpenMP is really available or not + isFake = false +#else + isFake = true +#endif + }; + + /*! + * \brief Register all run-time parameters of the thread manager. + */ + static void registerParameters(); + + /*! + * \brief Initialize number of threads used thread manager. + * + * \param queryCommandLineParameter if set to true we will query ThreadsPerProcess + * and if set (disregard the environment variable OPM_NUM_THREADS). + * If false we will assume that the number of OpenMP threads is already set + * outside of this function (e.g. by OPM_NUM_THREADS or in the simulator by + * the ThreadsPerProcess parameter). + */ + static void init(bool queryCommandLineParameter = true); + + /*! + * \brief Return the maximum number of threads of the current process. + */ + static unsigned maxThreads() + { return static_cast(numThreads_); } + + /*! + * \brief Return the index of the current OpenMP thread + */ + static unsigned threadId(); + +private: + static int numThreads_; +}; + +} // namespace Opm + +#endif // OPM_THREAD_MANAGER_HPP diff --git a/opm/models/utils/simulator.hh b/opm/models/utils/simulator.hh index 116da39c6..80341db08 100644 --- a/opm/models/utils/simulator.hh +++ b/opm/models/utils/simulator.hh @@ -33,9 +33,9 @@ #include #include -#include +#include #include -#include +#include #include #include diff --git a/opm/models/utils/start.hh b/opm/models/utils/start.hh index 62604c471..76ce1def9 100644 --- a/opm/models/utils/start.hh +++ b/opm/models/utils/start.hh @@ -35,7 +35,7 @@ #include "parametersystem.hh" #include -#include +#include #include @@ -76,7 +76,7 @@ template static inline void registerAllParameters_(bool finalizeRegistration = true) { using Simulator = GetPropType; - using ThreadManager = GetPropType; + using TM = GetPropType; Parameters::Register ("An .ini file which contains a set of run-time parameters"); @@ -84,7 +84,7 @@ static inline void registerAllParameters_(bool finalizeRegistration = true) ("Print the values of the run-time parameters at the " "start of the simulation"); - ThreadManager::registerParameters(); + TM::registerParameters(); Simulator::registerParameters(); if (finalizeRegistration) { @@ -279,7 +279,7 @@ static inline int start(int argc, char **argv, bool registerParams=true) using Scalar = GetPropType; using Simulator = GetPropType; using Problem = GetPropType; - using ThreadManager = GetPropType; + using TM = GetPropType; // set the signal handlers to reset the TTY to a well defined state on unexpected // program aborts @@ -304,7 +304,7 @@ static inline int start(int argc, char **argv, bool registerParams=true) if (paramStatus == 2) return 0; - ThreadManager::init(); + TM::init(); // initialize MPI, finalize is done automatically on exit #if HAVE_DUNE_FEM diff --git a/opm/models/utils/timer.cpp b/opm/models/utils/timer.cpp new file mode 100644 index 000000000..77f6f631d --- /dev/null +++ b/opm/models/utils/timer.cpp @@ -0,0 +1,148 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ + +#include +#include + +#if HAVE_MPI +#include +#endif + +namespace Opm { + +void Timer::TimeData::measure() +{ + // Note: On Linux -- or rather fully POSIX compliant systems -- using + // clock_gettime() would be more accurate for the CPU time. + realtimeData = std::chrono::high_resolution_clock::now(); + cputimeData = std::clock(); +} + +Timer::Timer() +{ + halt(); +} + +void Timer::start() +{ + isStopped_ = false; + startTime_.measure(); +} + +double Timer::stop() +{ + if (!isStopped_) { + TimeData stopTime; + + stopTime.measure(); + + const auto& t1 = startTime_.realtimeData; + const auto& t2 = stopTime.realtimeData; + std::chrono::duration dt = + std::chrono::duration_cast >(t2 - t1); + + realTimeElapsed_ += dt.count(); + cpuTimeElapsed_ += + static_cast(stopTime.cputimeData + - startTime_.cputimeData)/CLOCKS_PER_SEC; + } + + isStopped_ = true; + + return realTimeElapsed_; +} + +void Timer::halt() +{ + isStopped_ = true; + cpuTimeElapsed_ = 0.0; + realTimeElapsed_ = 0.0; +} + +void Timer::reset() +{ + cpuTimeElapsed_ = 0.0; + realTimeElapsed_ = 0.0; + + startTime_.measure(); +} + +double Timer::realTimeElapsed() const +{ + if (isStopped_) + return realTimeElapsed_; + + TimeData stopTime; + + stopTime.measure(); + + const auto& t1 = startTime_.realtimeData; + const auto& t2 = stopTime.realtimeData; + std::chrono::duration dt = + std::chrono::duration_cast >(t2 - t1); + + return realTimeElapsed_ + dt.count(); +} + +double Timer::cpuTimeElapsed() const +{ + if (isStopped_) + return cpuTimeElapsed_; + + TimeData stopTime; + + stopTime.measure(); + + const auto& t1 = startTime_.cputimeData; + const auto& t2 = stopTime.cputimeData; + + return cpuTimeElapsed_ + static_cast(t2 - t1)/CLOCKS_PER_SEC; +} + +double Timer::globalCpuTimeElapsed() const +{ + double val = cpuTimeElapsed(); + double globalVal = val; + +#if HAVE_MPI + MPI_Reduce(&val, + &globalVal, + /*count=*/1, + MPI_DOUBLE, + MPI_SUM, + /*rootRank=*/0, + MPI_COMM_WORLD); +#endif + + return globalVal; +} + +Timer& Timer::operator+=(const Timer& other) +{ + realTimeElapsed_ += other.realTimeElapsed(); + cpuTimeElapsed_ += other.cpuTimeElapsed(); + + return *this; +} + +} // namespace Opm diff --git a/opm/models/utils/timer.hh b/opm/models/utils/timer.hh deleted file mode 100644 index 855c88dd8..000000000 --- a/opm/models/utils/timer.hh +++ /dev/null @@ -1,217 +0,0 @@ -// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- -// vi: set et ts=4 sw=4 sts=4: -/* - This file is part of the Open Porous Media project (OPM). - - OPM is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 2 of the License, or - (at your option) any later version. - - OPM is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with OPM. If not, see . - - Consult the COPYING file in the top-level source directory of this - module for the precise wording of the license and the list of - copyright holders. -*/ -/*! - * \file - * - * \copydoc Opm::Timer - */ -#ifndef EWOMS_TIMER_HH -#define EWOMS_TIMER_HH - -#include - -#if HAVE_MPI -#include -#endif - -namespace Opm { -/*! - * \ingroup Common - * - * \brief Provides an encapsulation to measure the system time - * - * This means the wall clock time used by the simulation, the CPU time - * used by all threads of a single process and the CPU time used by - * the overall simulation. (i.e., the time used by all threads of all - * involved processes.) - */ -class Timer -{ - struct TimeData - { - std::chrono::high_resolution_clock::time_point realtimeData; - std::clock_t cputimeData; - }; -public: - Timer() - { halt(); } - - /*! - * \brief Start counting the time resources used by the simulation. - */ - void start() - { - isStopped_ = false; - measure_(startTime_); - } - - /*! - * \brief Stop counting the time resources. - * - * Returns the wall clock time the timer was active. - */ - double stop() - { - if (!isStopped_) { - TimeData stopTime; - - measure_(stopTime); - - const auto& t1 = startTime_.realtimeData; - const auto& t2 = stopTime.realtimeData; - std::chrono::duration dt = - std::chrono::duration_cast >(t2 - t1); - - realTimeElapsed_ += dt.count(); - cpuTimeElapsed_ += - static_cast(stopTime.cputimeData - - startTime_.cputimeData)/CLOCKS_PER_SEC; - } - - isStopped_ = true; - - return realTimeElapsed_; - } - - /*! - * \brief Stop the measurement reset all timing values - */ - void halt() - { - isStopped_ = true; - cpuTimeElapsed_ = 0.0; - realTimeElapsed_ = 0.0; - } - - /*! - * \brief Make the current point in time t=0 but do not change the status of the timer. - */ - void reset() - { - cpuTimeElapsed_ = 0.0; - realTimeElapsed_ = 0.0; - - measure_(startTime_); - } - - /*! - * \brief Return the real time [s] elapsed during the periods the timer was active - * since the last reset. - */ - double realTimeElapsed() const - { - if (isStopped_) - return realTimeElapsed_; - - TimeData stopTime; - - measure_(stopTime); - - const auto& t1 = startTime_.realtimeData; - const auto& t2 = stopTime.realtimeData; - std::chrono::duration dt = - std::chrono::duration_cast >(t2 - t1); - - return realTimeElapsed_ + dt.count(); - } - - /*! - * \brief This is an alias for realTimeElapsed() - * - * Its main purpose is to make the API of the class a superset of Dune::Timer - */ - double elapsed() const - { return realTimeElapsed(); } - - /*! - * \brief Return the CPU time [s] used by all threads of the local process for the - * periods the timer was active - */ - double cpuTimeElapsed() const - { - if (isStopped_) - return cpuTimeElapsed_; - - TimeData stopTime; - - measure_(stopTime); - - const auto& t1 = startTime_.cputimeData; - const auto& t2 = stopTime.cputimeData; - - return cpuTimeElapsed_ + static_cast(t2 - t1)/CLOCKS_PER_SEC; - } - - /*! - * \brief Return the CPU time [s] used by all threads of the all processes of program - * - * The value returned only differs from cpuTimeElapsed() if MPI is used. - */ - double globalCpuTimeElapsed() const - { - double val = cpuTimeElapsed(); - double globalVal = val; - -#if HAVE_MPI - MPI_Reduce(&val, - &globalVal, - /*count=*/1, - MPI_DOUBLE, - MPI_SUM, - /*rootRank=*/0, - MPI_COMM_WORLD); -#endif - - return globalVal; - } - - /*! - * \brief Adds the time of another timer to the current one - */ - Timer& operator+=(const Timer& other) - { - realTimeElapsed_ += other.realTimeElapsed(); - cpuTimeElapsed_ += other.cpuTimeElapsed(); - - return *this; - } - -private: - // measure the current time and put it into the object passed via - // the argument. - static void measure_(TimeData& timeData) - { - // Note: On Linux -- or rather fully POSIX compliant systems -- using - // clock_gettime() would be more accurate for the CPU time. - timeData.realtimeData = std::chrono::high_resolution_clock::now(); - timeData.cputimeData = std::clock(); - } - - bool isStopped_; - double cpuTimeElapsed_; - double realTimeElapsed_; - TimeData startTime_; -}; -} // namespace Opm - -#endif diff --git a/opm/models/utils/timer.hpp b/opm/models/utils/timer.hpp new file mode 100644 index 000000000..689209c66 --- /dev/null +++ b/opm/models/utils/timer.hpp @@ -0,0 +1,122 @@ +// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- +// vi: set et ts=4 sw=4 sts=4: +/* + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 2 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . + + Consult the COPYING file in the top-level source directory of this + module for the precise wording of the license and the list of + copyright holders. +*/ +/*! + * \file + * + * \copydoc Opm::Timer + */ +#ifndef OPM_TIMER_HPP +#define OPM_TIMER_HPP + +#include + +namespace Opm { + +/*! + * \ingroup Common + * + * \brief Provides an encapsulation to measure the system time + * + * This means the wall clock time used by the simulation, the CPU time + * used by all threads of a single process and the CPU time used by + * the overall simulation. (i.e., the time used by all threads of all + * involved processes.) + */ +class Timer +{ + struct TimeData + { + std::chrono::high_resolution_clock::time_point realtimeData; + std::clock_t cputimeData; + + // measure the current time and put it into the object. + void measure(); + }; + +public: + Timer(); + + /*! + * \brief Start counting the time resources used by the simulation. + */ + void start(); + + /*! + * \brief Stop counting the time resources. + * + * Returns the wall clock time the timer was active. + */ + double stop(); + + /*! + * \brief Stop the measurement reset all timing values + */ + void halt(); + + /*! + * \brief Make the current point in time t=0 but do not change the status of the timer. + */ + void reset(); + + /*! + * \brief Return the real time [s] elapsed during the periods the timer was active + * since the last reset. + */ + double realTimeElapsed() const; + + /*! + * \brief This is an alias for realTimeElapsed() + * + * Its main purpose is to make the API of the class a superset of Dune::Timer + */ + double elapsed() const + { return realTimeElapsed(); } + + /*! + * \brief Return the CPU time [s] used by all threads of the local process for the + * periods the timer was active + */ + double cpuTimeElapsed() const; + + /*! + * \brief Return the CPU time [s] used by all threads of the all processes of program + * + * The value returned only differs from cpuTimeElapsed() if MPI is used. + */ + double globalCpuTimeElapsed() const; + + /*! + * \brief Adds the time of another timer to the current one + */ + Timer& operator+=(const Timer& other); + +private: + bool isStopped_; + double cpuTimeElapsed_; + double realTimeElapsed_; + TimeData startTime_; +}; + +} // namespace Opm + +#endif // OPM_TIMER_HPP diff --git a/opm/models/utils/timerguard.hh b/opm/models/utils/timerguard.hh index 9b08bbc1e..c2d89e13a 100644 --- a/opm/models/utils/timerguard.hh +++ b/opm/models/utils/timerguard.hh @@ -28,7 +28,7 @@ #ifndef EWOMS_TIMER_GUARD_HH #define EWOMS_TIMER_GUARD_HH -#include "timer.hh" +#include namespace Opm { /*! diff --git a/opm/simulators/flow/EclGenericWriter.hpp b/opm/simulators/flow/EclGenericWriter.hpp index c6b39f9a3..fee0f0c65 100644 --- a/opm/simulators/flow/EclGenericWriter.hpp +++ b/opm/simulators/flow/EclGenericWriter.hpp @@ -28,7 +28,7 @@ #ifndef OPM_ECL_GENERIC_WRITER_HPP #define OPM_ECL_GENERIC_WRITER_HPP -#include +#include #include #include diff --git a/opm/simulators/flow/FlowMain.hpp b/opm/simulators/flow/FlowMain.hpp index f9a391ef4..b9727a85c 100644 --- a/opm/simulators/flow/FlowMain.hpp +++ b/opm/simulators/flow/FlowMain.hpp @@ -38,6 +38,10 @@ #include #endif +#ifdef _OPENMP +#include +#endif + #include #include #include @@ -97,7 +101,7 @@ namespace Opm { ("Developer option to see whether logging was on non-root processors. " "In that case it will be appended to the *.DBG or *.PRT files"); - ThreadManager::registerParameters(); + ThreadManager::registerParameters(); Simulator::registerParameters(); // register the base parameters @@ -305,8 +309,8 @@ namespace Opm { omp_set_num_threads(threads); #endif - using ThreadManager = GetPropType; - ThreadManager::init(false); + using TM = GetPropType; + TM::init(false); } void mergeParallelLogFiles() diff --git a/opm/simulators/linalg/bicgstabsolver.hh b/opm/simulators/linalg/bicgstabsolver.hh index 445e7faef..9b1147720 100644 --- a/opm/simulators/linalg/bicgstabsolver.hh +++ b/opm/simulators/linalg/bicgstabsolver.hh @@ -31,7 +31,7 @@ #include "residreductioncriterion.hh" #include "linearsolverreport.hh" -#include +#include #include #include diff --git a/opm/simulators/linalg/linearsolverreport.hh b/opm/simulators/linalg/linearsolverreport.hh index 63b10147c..1d7f34a54 100644 --- a/opm/simulators/linalg/linearsolverreport.hh +++ b/opm/simulators/linalg/linearsolverreport.hh @@ -29,7 +29,7 @@ #include "convergencecriterion.hh" -#include +#include #include namespace Opm { diff --git a/parallelUnitTests.cmake b/parallelUnitTests.cmake index 906b54f5d..4036bf7a1 100644 --- a/parallelUnitTests.cmake +++ b/parallelUnitTests.cmake @@ -184,6 +184,8 @@ opm_add_test(test_rstconv_parallel ) opm_add_test(test_mpiutil + DEPENDS "opmsimulators" + LIBRARIES opmsimulators CONDITION MPI_FOUND AND Boost_UNIT_TEST_FRAMEWORK_FOUND SOURCES diff --git a/tests/models/test_mpiutil.cpp b/tests/models/test_mpiutil.cpp index 327575b43..4a4f85da3 100644 --- a/tests/models/test_mpiutil.cpp +++ b/tests/models/test_mpiutil.cpp @@ -19,7 +19,7 @@ #include -#include +#include #include #include diff --git a/tests/models/test_tasklets.cpp b/tests/models/test_tasklets.cpp index be54ffa8b..838f7fccc 100644 --- a/tests/models/test_tasklets.cpp +++ b/tests/models/test_tasklets.cpp @@ -28,8 +28,9 @@ */ #include "config.h" -#include +#include +#include #include #include diff --git a/tests/models/test_tasklets_failure.cpp b/tests/models/test_tasklets_failure.cpp index c0eb6c7d0..f01793266 100644 --- a/tests/models/test_tasklets_failure.cpp +++ b/tests/models/test_tasklets_failure.cpp @@ -39,7 +39,7 @@ #include #include "config.h" -#include +#include std::mutex outputMutex; diff --git a/tests/test_RestartSerialization.cpp b/tests/test_RestartSerialization.cpp index 4d3b9ec28..c69a6b539 100644 --- a/tests/test_RestartSerialization.cpp +++ b/tests/test_RestartSerialization.cpp @@ -527,7 +527,7 @@ struct AquiferFixture { "test_RestartSerialization", "--ecl-deck-file-name=GLIFT1.DATA" }; - Opm::ThreadManager::registerParameters(); + Opm::ThreadManager::registerParameters(); AdaptiveTimeStepping::registerParameters(); BlackoilModelParameters::registerParameters(); Parameters::Register("Do *NOT* use!"); diff --git a/tests/test_equil.cpp b/tests/test_equil.cpp index fdc3157bb..1971e5293 100644 --- a/tests/test_equil.cpp +++ b/tests/test_equil.cpp @@ -232,7 +232,7 @@ struct EquilFixture { #endif using namespace Opm; FlowGenericVanguard::setCommunication(std::make_unique()); - Opm::ThreadManager::registerParameters(); + Opm::ThreadManager::registerParameters(); BlackoilModelParameters::registerParameters(); AdaptiveTimeStepping::registerParameters(); Parameters::Register("Dummy added for the well model to compile."); diff --git a/tests/test_outputdir.cpp b/tests/test_outputdir.cpp index cce63c26f..d0bf0d466 100644 --- a/tests/test_outputdir.cpp +++ b/tests/test_outputdir.cpp @@ -116,7 +116,7 @@ BOOST_FIXTURE_TEST_CASE(WithOutputDir, Fixture) Opm::Parameters::reset(); - Opm::ThreadManager::registerParameters(); + Opm::ThreadManager::registerParameters(); Opm::Main main(3, const_cast(no_param), false); BOOST_CHECK_EQUAL(main.justInitialize(), EXIT_SUCCESS); @@ -154,7 +154,7 @@ BOOST_FIXTURE_TEST_CASE(NoOutputDir, Fixture) const char* no_param[] = {"test_outputdir", input_file_path.c_str(), nullptr}; Opm::Parameters::reset(); - Opm::ThreadManager::registerParameters(); + Opm::ThreadManager::registerParameters(); Opm::Main main(2, const_cast(no_param), false);