Merge pull request #5575 from akva2/parallel_tu

Add some more translation units for code from opm-models
This commit is contained in:
Bård Skaflestad 2024-09-04 12:46:05 +02:00 committed by GitHub
commit da1e7b1114
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1129 additions and 969 deletions

View File

@ -407,6 +407,7 @@ if(QuadMath_FOUND)
co2injection_flash_ecfv
co2injection_flash_vcfv)
opm_add_test(${tapp}_quad
LIBRARIES opmsimulators opmcommon
EXE_NAME ${tapp}_quad
SOURCES
examples/${tapp}.cpp

View File

@ -61,6 +61,10 @@ list (APPEND MAIN_SOURCE_FILES
opm/models/blackoil/blackoilmicpparams.cpp
opm/models/blackoil/blackoilpolymerparams.cpp
opm/models/blackoil/blackoilsolventparams.cpp
opm/models/parallel/mpiutil.cpp
opm/models/parallel/tasklets.cpp
opm/models/parallel/threadmanager.cpp
opm/models/utils/timer.cpp
opm/simulators/flow/ActionHandler.cpp
opm/simulators/flow/Banners.cpp
opm/simulators/flow/BlackoilModelParameters.cpp
@ -668,10 +672,10 @@ list (APPEND PUBLIC_HEADER_FILES
opm/models/nonlinear/nullconvergencewriter.hh
opm/models/parallel/gridcommhandles.hh
opm/models/parallel/mpibuffer.hh
opm/models/parallel/mpiutil.hh
opm/models/parallel/tasklets.hh
opm/models/parallel/mpiutil.hpp
opm/models/parallel/tasklets.hpp
opm/models/parallel/threadedentityiterator.hh
opm/models/parallel/threadmanager.hh
opm/models/parallel/threadmanager.hpp
opm/models/ptflash/flashindices.hh
opm/models/ptflash/flashintensivequantities.hh
opm/models/ptflash/flashlocalresidual.hh
@ -711,7 +715,7 @@ list (APPEND PUBLIC_HEADER_FILES
opm/models/utils/signum.hh
opm/models/utils/simulator.hh
opm/models/utils/start.hh
opm/models/utils/timer.hh
opm/models/utils/timer.hpp
opm/models/utils/timerguard.hh
opm/simulators/flow/ActionHandler.hpp
opm/simulators/flow/AluGridCartesianIndexMapper.hpp

View File

@ -56,11 +56,11 @@
#include <opm/models/io/vtkprimaryvarsmodule.hh>
#include <opm/models/parallel/gridcommhandles.hh>
#include <opm/models/parallel/threadmanager.hh>
#include <opm/models/parallel/threadmanager.hpp>
#include <opm/models/utils/alignedallocator.hh>
#include <opm/models/utils/simulator.hh>
#include <opm/models/utils/timer.hh>
#include <opm/models/utils/timer.hpp>
#include <opm/models/utils/timerguard.hh>
#include <opm/simulators/linalg/linalgparameters.hh>
@ -219,7 +219,7 @@ struct ConstraintsContext<TypeTag, TTag::FvBaseDiscretization>
*/
template<class TypeTag>
struct ThreadManager<TypeTag, TTag::FvBaseDiscretization>
{ using type = ::Opm::ThreadManager<TypeTag>; };
{ using type = ::Opm::ThreadManager; };
template<class TypeTag>
struct UseLinearizationLock<TypeTag, TTag::FvBaseDiscretization>

View File

@ -36,7 +36,7 @@
#include <opm/grid/utility/SparseTable.hpp>
#include <opm/models/parallel/gridcommhandles.hh>
#include <opm/models/parallel/threadmanager.hh>
#include <opm/models/parallel/threadmanager.hpp>
#include <opm/models/parallel/threadedentityiterator.hh>
#include <opm/models/discretization/common/baseauxiliarymodule.hh>

View File

@ -33,7 +33,7 @@
#include "vtktensorfunction.hh"
#include <opm/models/io/baseoutputwriter.hh>
#include <opm/models/parallel/tasklets.hh>
#include <opm/models/parallel/tasklets.hpp>
#include <opm/material/common/Valgrind.hpp>

View File

@ -41,7 +41,7 @@
#include <opm/models/nonlinear/newtonmethodproperties.hh>
#include <opm/models/nonlinear/nullconvergencewriter.hh>
#include <opm/models/utils/timer.hh>
#include <opm/models/utils/timer.hpp>
#include <opm/models/utils/timerguard.hh>
#include <opm/simulators/linalg/linalgproperties.hh>

View File

@ -0,0 +1,188 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
#include <config.h>
#include <opm/models/parallel/mpiutil.hpp>
#include <cassert>
#include <numeric>
#include <string>
#include <vector>
#if HAVE_MPI
#include <mpi.h>
#include <dune/common/parallel/mpitraits.hh>
#endif
namespace {
template <typename T>
int packSize()
{
int pack_size;
MPI_Pack_size(1, Dune::MPITraits<T>::getType(), MPI_COMM_WORLD, &pack_size);
return pack_size;
}
// -------- Packer --------
template <typename T>
struct Packer
{
static int size(const T&)
{
return packSize<T>();
}
static void pack(const T& content, std::vector<char>& buf, int& offset)
{
MPI_Pack(&content, 1, Dune::MPITraits<T>::getType(), buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
static T unpack(const std::vector<char>& recv_buffer, int& offset)
{
T content;
auto* data = const_cast<char*>(recv_buffer.data());
MPI_Unpack(data, recv_buffer.size(), &offset, &content, 1, Dune::MPITraits<T>::getType(), MPI_COMM_WORLD);
return content;
}
};
// -------- Packer, string specialization --------
template <>
struct Packer<std::string>
{
static int size(const std::string& content)
{
return packSize<unsigned int>() + content.size()*packSize<char>();
}
static void pack(const std::string& content, std::vector<char>& buf, int& offset)
{
unsigned int size = content.size();
Packer<unsigned int>::pack(size, buf, offset);
if (size > 0) {
MPI_Pack(const_cast<char*>(content.c_str()), size, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
}
static std::string unpack(const std::vector<char>& recv_buffer, int& offset)
{
unsigned int size = Packer<unsigned int>::unpack(recv_buffer, offset);
std::string text;
if (size > 0) {
auto* data = const_cast<char*>(recv_buffer.data());
std::vector<char> chars(size);
MPI_Unpack(data, recv_buffer.size(), &offset, chars.data(), size, MPI_CHAR, MPI_COMM_WORLD);
text = std::string(chars.data(), size);
}
return text;
}
};
// -------- Packer, vector partial specialization --------
template <typename T>
struct Packer<std::vector<T>>
{
static int size(const std::string& content)
{
int sz = 0;
sz += packSize<unsigned int>();
for (const T& elem : content) {
sz += Packer<T>::size(elem);
}
return sz;
}
static void pack(const std::vector<T>& content, std::vector<char>& buf, int& offset)
{
unsigned int size = content.size();
Packer<unsigned int>::pack(size, buf, offset);
for (const T& elem : content) {
Packer<T>::pack(elem);
}
}
static std::vector<T> unpack(const std::vector<char>& recv_buffer, int& offset)
{
unsigned int size = Packer<T>::unpack(recv_buffer, offset);
std::vector<T> content;
content.reserve(size);
for (unsigned int i = 0; i < size; ++i) {
content.push_back(Packer<T>::unpack(recv_buffer, offset));
}
return content;
}
};
} // anonymous namespace
namespace Opm {
/// From each rank, gather its string (if not empty) into a vector.
std::vector<std::string> gatherStrings(const std::string& local_string)
{
#if HAVE_MPI
using StringPacker = Packer<std::string>;
// Pack local messages.
const int message_size = StringPacker::size(local_string);
std::vector<char> buffer(message_size);
int offset = 0;
StringPacker::pack(local_string, buffer, offset);
assert(offset == message_size);
// Get message sizes and create offset/displacement array for gathering.
int num_processes = -1;
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
std::vector<int> message_sizes(num_processes);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
std::vector<int> displ(num_processes + 1, 0);
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
// Gather.
std::vector<char> recv_buffer(displ.back());
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
const_cast<char*>(recv_buffer.data()), message_sizes.data(),
displ.data(), MPI_PACKED,
MPI_COMM_WORLD);
// Unpack and return.
std::vector<std::string> ret;
for (int process = 0; process < num_processes; ++process) {
offset = displ[process];
std::string s = StringPacker::unpack(recv_buffer, offset);
if (!s.empty()) {
ret.push_back(s);
}
assert(offset == displ[process + 1]);
}
return ret;
#else
if (local_string.empty()) {
return {};
} else {
return { local_string };
}
#endif
}
} // namespace Opm

View File

@ -1,211 +0,0 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::MpiBuffer
*/
#ifndef OPM_MATERIAL_MPIUTIL_HH
#define OPM_MATERIAL_MPIUTIL_HH
#include <dune/common/parallel/mpitraits.hh>
#include <cassert>
#include <numeric>
#include <string>
#include <vector>
#if HAVE_MPI
#include <mpi.h>
namespace mpiutil_details
{
template <typename T>
int packSize()
{
int pack_size;
MPI_Pack_size(1, Dune::MPITraits<T>::getType(), MPI_COMM_WORLD, &pack_size);
return pack_size;
}
// -------- Packer --------
template <typename T>
struct Packer
{
static int size(const T&)
{
return packSize<T>();
}
static void pack(const T& content, std::vector<char>& buf, int& offset)
{
MPI_Pack(&content, 1, Dune::MPITraits<T>::getType(), buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
static T unpack(const std::vector<char>& recv_buffer, int& offset)
{
T content;
auto* data = const_cast<char*>(recv_buffer.data());
MPI_Unpack(data, recv_buffer.size(), &offset, &content, 1, Dune::MPITraits<T>::getType(), MPI_COMM_WORLD);
return content;
}
};
// -------- Packer, string specialization --------
template <>
struct Packer<std::string>
{
static int size(const std::string& content)
{
return packSize<unsigned int>() + content.size()*packSize<char>();
}
static void pack(const std::string& content, std::vector<char>& buf, int& offset)
{
unsigned int size = content.size();
Packer<unsigned int>::pack(size, buf, offset);
if (size > 0) {
MPI_Pack(const_cast<char*>(content.c_str()), size, MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
}
static std::string unpack(const std::vector<char>& recv_buffer, int& offset)
{
unsigned int size = Packer<unsigned int>::unpack(recv_buffer, offset);
std::string text;
if (size > 0) {
auto* data = const_cast<char*>(recv_buffer.data());
std::vector<char> chars(size);
MPI_Unpack(data, recv_buffer.size(), &offset, chars.data(), size, MPI_CHAR, MPI_COMM_WORLD);
text = std::string(chars.data(), size);
}
return text;
}
};
// -------- Packer, vector partial specialization --------
template <typename T>
struct Packer<std::vector<T>>
{
static int size(const std::string& content)
{
int sz = 0;
sz += packSize<unsigned int>();
for (const T& elem : content) {
sz += Packer<T>::size(elem);
}
return sz;
}
static void pack(const std::vector<T>& content, std::vector<char>& buf, int& offset)
{
unsigned int size = content.size();
Packer<unsigned int>::pack(size, buf, offset);
for (const T& elem : content) {
Packer<T>::pack(elem);
}
}
static std::vector<T> unpack(const std::vector<char>& recv_buffer, int& offset)
{
unsigned int size = Packer<T>::unpack(recv_buffer, offset);
std::vector<T> content;
content.reserve(size);
for (unsigned int i = 0; i < size; ++i) {
content.push_back(Packer<T>::unpack(recv_buffer, offset));
}
return content;
}
};
} // anonymous namespace
namespace Opm
{
/// From each rank, gather its string (if not empty) into a vector.
inline std::vector<std::string> gatherStrings(const std::string& local_string)
{
using StringPacker = mpiutil_details::Packer<std::string>;
// Pack local messages.
const int message_size = StringPacker::size(local_string);
std::vector<char> buffer(message_size);
int offset = 0;
StringPacker::pack(local_string, buffer, offset);
assert(offset == message_size);
// Get message sizes and create offset/displacement array for gathering.
int num_processes = -1;
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
std::vector<int> message_sizes(num_processes);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
std::vector<int> displ(num_processes + 1, 0);
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
// Gather.
std::vector<char> recv_buffer(displ.back());
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
const_cast<char*>(recv_buffer.data()), message_sizes.data(),
displ.data(), MPI_PACKED,
MPI_COMM_WORLD);
// Unpack and return.
std::vector<std::string> ret;
for (int process = 0; process < num_processes; ++process) {
offset = displ[process];
std::string s = StringPacker::unpack(recv_buffer, offset);
if (!s.empty()) {
ret.push_back(s);
}
assert(offset == displ[process + 1]);
}
return ret;
}
} // namespace Opm
#else // HAVE_MPI
namespace Opm
{
inline std::vector<std::string> gatherStrings(const std::string& local_string)
{
if (local_string.empty()) {
return {};
} else {
return { local_string };
}
}
} // namespace Opm
#endif // HAVE_MPI
#endif // OPM_MATERIAL_MPIUTIL_HH

View File

@ -0,0 +1,40 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::MpiBuffer
*/
#ifndef OPM_MPIUTIL_HPP
#define OPM_MPIUTIL_HPP
#include <string>
#include <vector>
namespace Opm {
std::vector<std::string> gatherStrings(const std::string& local_string);
} // namespace Opm
#endif // OPM_MPIUTIL_HPP

View File

@ -0,0 +1,205 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
#include <config.h>
#include <opm/models/parallel/tasklets.hpp>
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
namespace Opm {
thread_local TaskletRunner* TaskletRunner::taskletRunner_ = nullptr;
thread_local int TaskletRunner::workerThreadIndex_ = -1;
TaskletRunner::BarrierTasklet::BarrierTasklet(unsigned numWorkers)
: TaskletInterface(/*refCount=*/numWorkers)
{
numWorkers_ = numWorkers;
numWaiting_ = 0;
}
void TaskletRunner::BarrierTasklet::run()
{
wait();
}
void TaskletRunner::BarrierTasklet::wait()
{
std::unique_lock<std::mutex> lock(barrierMutex_);
numWaiting_ += 1;
if (numWaiting_ >= numWorkers_ + 1) {
lock.unlock();
barrierCondition_.notify_all();
}
else {
const auto& areAllWaiting =
[this]() -> bool
{ return this->numWaiting_ >= this->numWorkers_ + 1; };
barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
}
}
TaskletRunner::TaskletRunner(unsigned numWorkers)
{
threads_.resize(numWorkers);
for (unsigned i = 0; i < numWorkers; ++i)
// create a worker thread
threads_[i].reset(new std::thread(startWorkerThread_, this, i));
}
TaskletRunner::~TaskletRunner()
{
if (threads_.size() > 0) {
// dispatch a tasklet which will terminate the worker thread
dispatch(std::make_shared<TerminateThreadTasklet>());
// wait until all worker threads have terminated
for (auto& thread : threads_)
thread->join();
}
}
bool TaskletRunner::failure() const
{
return this->failureFlag_.load(std::memory_order_relaxed);
}
int TaskletRunner::workerThreadIndex() const
{
if (TaskletRunner::taskletRunner_ != this)
return -1;
return TaskletRunner::workerThreadIndex_;
}
void TaskletRunner::dispatch(std::shared_ptr<TaskletInterface> tasklet)
{
if (threads_.empty()) {
// run the tasklet immediately in synchronous mode.
while (tasklet->referenceCount() > 0) {
tasklet->dereference();
try {
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
catch (...) {
std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
}
}
else {
// lock mutex for the tasklet queue to make sure that nobody messes with the
// task queue
taskletQueueMutex_.lock();
// add the tasklet to the queue
taskletQueue_.push(tasklet);
taskletQueueMutex_.unlock();
workAvailableCondition_.notify_all();
}
}
void TaskletRunner::barrier()
{
unsigned numWorkers = threads_.size();
if (numWorkers == 0)
// nothing needs to be done to implement a barrier in synchronous mode
return;
// dispatch a barrier tasklet and wait until it has been run by the worker thread
auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
dispatch(barrierTasklet);
barrierTasklet->wait();
}
void TaskletRunner::startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex)
{
TaskletRunner::taskletRunner_ = taskletRunner;
TaskletRunner::workerThreadIndex_ = workerThreadIndex;
taskletRunner->run_();
}
void TaskletRunner::run_()
{
while (true) {
// wait until tasklets have been pushed to the queue. first we need to lock
// mutex for access to taskletQueue_
std::unique_lock<std::mutex> lock(taskletQueueMutex_);
const auto& workIsAvailable =
[this]() -> bool
{ return !taskletQueue_.empty(); };
if (!workIsAvailable())
workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable);
// remove tasklet from queue
std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
// if tasklet is an end marker, terminate the thread and DO NOT remove the
// tasklet.
if (tasklet->isEndMarker()) {
if(taskletQueue_.size() > 1)
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
taskletQueueMutex_.unlock();
return;
}
tasklet->dereference();
if (tasklet->referenceCount() == 0)
// remove tasklets from the queue as soon as their reference count
// reaches zero, i.e. the tasklet has been run often enough.
taskletQueue_.pop();
lock.unlock();
// execute tasklet
try {
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
catch (...) {
std::cerr << "ERROR: Uncaught exception when running tasklet.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
}
}
} // end namespace Opm

View File

@ -1,369 +0,0 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \brief Provides a mechanism to dispatch work to separate threads
*/
#ifndef EWOMS_TASKLETS_HH
#define EWOMS_TASKLETS_HH
#include <atomic>
#include <stdexcept>
#include <cassert>
#include <thread>
#include <queue>
#include <mutex>
#include <iostream>
#include <condition_variable>
namespace Opm {
/*!
* \brief The base class for tasklets.
*
* Tasklets are a generic mechanism for potentially running work in a separate thread.
*/
class TaskletInterface
{
public:
TaskletInterface(int refCount = 1)
: referenceCount_(refCount)
{}
virtual ~TaskletInterface() {}
virtual void run() = 0;
virtual bool isEndMarker () const { return false; }
void dereference()
{ -- referenceCount_; }
int referenceCount() const
{ return referenceCount_; }
private:
int referenceCount_;
};
/*!
* \brief A simple tasklet that runs a function that returns void and does not take any
* arguments a given number of times.
*/
template <class Fn>
class FunctionRunnerTasklet : public TaskletInterface
{
public:
FunctionRunnerTasklet(const FunctionRunnerTasklet&) = default;
FunctionRunnerTasklet(int numInvocations, const Fn& fn)
: TaskletInterface(numInvocations)
, fn_(fn)
{}
void run() override
{ fn_(); }
private:
const Fn& fn_;
};
class TaskletRunner;
// this class stores the thread local static attributes for the TaskletRunner class. we
// cannot put them directly into TaskletRunner because defining static members for
// non-template classes in headers leads the linker to choke in case multiple compile
// units are used.
template <class Dummy = void>
struct TaskletRunnerHelper_
{
static thread_local TaskletRunner* taskletRunner_;
static thread_local int workerThreadIndex_;
};
template <class Dummy>
thread_local TaskletRunner* TaskletRunnerHelper_<Dummy>::taskletRunner_ = nullptr;
template <class Dummy>
thread_local int TaskletRunnerHelper_<Dummy>::workerThreadIndex_ = -1;
/*!
* \brief Handles where a given tasklet is run.
*
* Depending on the number of worker threads, a tasklet can either be run in a separate
* worker thread or by the main thread.
*/
class TaskletRunner
{
/// \brief Implements a barrier. This class can only be used in the asynchronous case.
class BarrierTasklet : public TaskletInterface
{
public:
BarrierTasklet(unsigned numWorkers)
: TaskletInterface(/*refCount=*/numWorkers)
{
numWorkers_ = numWorkers;
numWaiting_ = 0;
}
void run()
{ wait(); }
void wait()
{
std::unique_lock<std::mutex> lock(barrierMutex_);
numWaiting_ += 1;
if (numWaiting_ >= numWorkers_ + 1) {
lock.unlock();
barrierCondition_.notify_all();
}
else {
const auto& areAllWaiting =
[this]() -> bool
{ return this->numWaiting_ >= this->numWorkers_ + 1; };
barrierCondition_.wait(lock, /*predicate=*/areAllWaiting);
}
}
private:
unsigned numWorkers_;
unsigned numWaiting_;
std::condition_variable barrierCondition_;
std::mutex barrierMutex_;
};
/// \brief TerminateThreadTasklet class
/// Empty tasklet marking thread termination.
class TerminateThreadTasklet : public TaskletInterface
{
public:
void run()
{ }
bool isEndMarker() const
{ return true; }
};
public:
// prohibit copying of tasklet runners
TaskletRunner(const TaskletRunner&) = delete;
/*!
* \brief Creates a tasklet runner with numWorkers underling threads for doing work.
*
* The number of worker threads may be 0. In this case, all work is done by the main
* thread (synchronous mode).
*/
TaskletRunner(unsigned numWorkers)
{
threads_.resize(numWorkers);
for (unsigned i = 0; i < numWorkers; ++i)
// create a worker thread
threads_[i].reset(new std::thread(startWorkerThread_, this, i));
}
/*!
* \brief Destructor
*
* If worker threads were created to run the tasklets, this method waits until all
* worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to
* be completed.
*/
~TaskletRunner()
{
if (threads_.size() > 0) {
// dispatch a tasklet which will terminate the worker thread
dispatch(std::make_shared<TerminateThreadTasklet>());
// wait until all worker threads have terminated
for (auto& thread : threads_)
thread->join();
}
}
bool failure() const
{
return this->failureFlag_.load(std::memory_order_relaxed);
}
/*!
* \brief Returns the index of the current worker thread.
*
* If the current thread is not a worker thread, -1 is returned.
*/
int workerThreadIndex() const
{
if (TaskletRunnerHelper_<void>::taskletRunner_ != this)
return -1;
return TaskletRunnerHelper_<void>::workerThreadIndex_;
}
/*!
* \brief Returns the number of worker threads for the tasklet runner.
*/
int numWorkerThreads() const
{ return threads_.size(); }
/*!
* \brief Add a new tasklet.
*
* The tasklet is either run immediately or deferred to a separate thread.
*/
void dispatch(std::shared_ptr<TaskletInterface> tasklet)
{
if (threads_.empty()) {
// run the tasklet immediately in synchronous mode.
while (tasklet->referenceCount() > 0) {
tasklet->dereference();
try {
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ". Trying to continue.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
catch (...) {
std::cerr << "ERROR: Uncaught exception (general type) when running tasklet. Trying to continue.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
}
}
else {
// lock mutex for the tasklet queue to make sure that nobody messes with the
// task queue
taskletQueueMutex_.lock();
// add the tasklet to the queue
taskletQueue_.push(tasklet);
taskletQueueMutex_.unlock();
workAvailableCondition_.notify_all();
}
}
/*!
* \brief Convenience method to construct a new function runner tasklet and dispatch it immediately.
*/
template <class Fn>
std::shared_ptr<FunctionRunnerTasklet<Fn> > dispatchFunction(Fn &fn, int numInvocations=1)
{
using Tasklet = FunctionRunnerTasklet<Fn>;
auto tasklet = std::make_shared<Tasklet>(numInvocations, fn);
this->dispatch(tasklet);
return tasklet;
}
/*!
* \brief Make sure that all tasklets have been completed after this method has been called
*/
void barrier()
{
unsigned numWorkers = threads_.size();
if (numWorkers == 0)
// nothing needs to be done to implement a barrier in synchronous mode
return;
// dispatch a barrier tasklet and wait until it has been run by the worker thread
auto barrierTasklet = std::make_shared<BarrierTasklet>(numWorkers);
dispatch(barrierTasklet);
barrierTasklet->wait();
}
private:
// Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails.
// This flag is checked before new tasklets run or get dispatched and in case it is true, the
// thread execution will be stopped / no new tasklets will be started and the program will abort.
// To set the flag and load the flag, we use std::memory_order_relaxed.
// Atomic operations tagged memory_order_relaxed are not synchronization operations; they do not
// impose an order among concurrent memory accesses. They guarantee atomicity and modification order
// consistency. This is the right choice for the setting here, since it is enough to broadcast failure
// before new run or get dispatched.
std::atomic<bool> failureFlag_ = false;
protected:
// main function of the worker thread
static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex)
{
TaskletRunnerHelper_<void>::taskletRunner_ = taskletRunner;
TaskletRunnerHelper_<void>::workerThreadIndex_ = workerThreadIndex;
taskletRunner->run_();
}
//! do the work until the queue received an end tasklet
void run_()
{
while (true) {
// wait until tasklets have been pushed to the queue. first we need to lock
// mutex for access to taskletQueue_
std::unique_lock<std::mutex> lock(taskletQueueMutex_);
const auto& workIsAvailable =
[this]() -> bool
{ return !taskletQueue_.empty(); };
if (!workIsAvailable())
workAvailableCondition_.wait(lock, /*predicate=*/workIsAvailable);
// remove tasklet from queue
std::shared_ptr<TaskletInterface> tasklet = taskletQueue_.front();
// if tasklet is an end marker, terminate the thread and DO NOT remove the
// tasklet.
if (tasklet->isEndMarker()) {
if(taskletQueue_.size() > 1)
throw std::logic_error("TaskletRunner: Not all queued tasklets were executed");
taskletQueueMutex_.unlock();
return;
}
tasklet->dereference();
if (tasklet->referenceCount() == 0)
// remove tasklets from the queue as soon as their reference count
// reaches zero, i.e. the tasklet has been run often enough.
taskletQueue_.pop();
lock.unlock();
// execute tasklet
try {
tasklet->run();
}
catch (const std::exception& e) {
std::cerr << "ERROR: Uncaught std::exception when running tasklet: " << e.what() << ".\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
catch (...) {
std::cerr << "ERROR: Uncaught exception when running tasklet.\n";
failureFlag_.store(true, std::memory_order_relaxed);
}
}
}
std::vector<std::unique_ptr<std::thread> > threads_;
std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
std::mutex taskletQueueMutex_;
std::condition_variable workAvailableCondition_;
};
} // end namespace Opm
#endif

View File

@ -0,0 +1,210 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \brief Provides a mechanism to dispatch work to separate threads
*/
#ifndef OPM_TASKLETS_HPP
#define OPM_TASKLETS_HPP
#include <atomic>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
namespace Opm {
/*!
* \brief The base class for tasklets.
*
* Tasklets are a generic mechanism for potentially running work in a separate thread.
*/
class TaskletInterface
{
public:
TaskletInterface(int refCount = 1)
: referenceCount_(refCount)
{}
virtual ~TaskletInterface() {}
virtual void run() = 0;
virtual bool isEndMarker () const { return false; }
void dereference()
{ -- referenceCount_; }
int referenceCount() const
{ return referenceCount_; }
private:
int referenceCount_;
};
/*!
* \brief A simple tasklet that runs a function that returns void and does not take any
* arguments a given number of times.
*/
template <class Fn>
class FunctionRunnerTasklet : public TaskletInterface
{
public:
FunctionRunnerTasklet(const FunctionRunnerTasklet&) = default;
FunctionRunnerTasklet(int numInvocations, const Fn& fn)
: TaskletInterface(numInvocations)
, fn_(fn)
{}
void run() override
{ fn_(); }
private:
const Fn& fn_;
};
/*!
* \brief Handles where a given tasklet is run.
*
* Depending on the number of worker threads, a tasklet can either be run in a separate
* worker thread or by the main thread.
*/
class TaskletRunner
{
/// \brief Implements a barrier. This class can only be used in the asynchronous case.
class BarrierTasklet : public TaskletInterface
{
public:
BarrierTasklet(unsigned numWorkers);
void run();
void wait();
private:
unsigned numWorkers_;
unsigned numWaiting_;
std::condition_variable barrierCondition_;
std::mutex barrierMutex_;
};
/// \brief TerminateThreadTasklet class
/// Empty tasklet marking thread termination.
class TerminateThreadTasklet : public TaskletInterface
{
public:
void run()
{ }
bool isEndMarker() const
{ return true; }
};
public:
// prohibit copying of tasklet runners
TaskletRunner(const TaskletRunner&) = delete;
/*!
* \brief Creates a tasklet runner with numWorkers underling threads for doing work.
*
* The number of worker threads may be 0. In this case, all work is done by the main
* thread (synchronous mode).
*/
TaskletRunner(unsigned numWorkers);
/*!
* \brief Destructor
*
* If worker threads were created to run the tasklets, this method waits until all
* worker threads have been terminated, i.e. all scheduled tasklets are guaranteed to
* be completed.
*/
~TaskletRunner();
bool failure() const;
/*!
* \brief Returns the index of the current worker thread.
*
* If the current thread is not a worker thread, -1 is returned.
*/
int workerThreadIndex() const;
/*!
* \brief Returns the number of worker threads for the tasklet runner.
*/
int numWorkerThreads() const
{ return threads_.size(); }
/*!
* \brief Add a new tasklet.
*
* The tasklet is either run immediately or deferred to a separate thread.
*/
void dispatch(std::shared_ptr<TaskletInterface> tasklet);
/*!
* \brief Convenience method to construct a new function runner tasklet and dispatch it immediately.
*/
template <class Fn>
std::shared_ptr<FunctionRunnerTasklet<Fn> > dispatchFunction(Fn &fn, int numInvocations=1)
{
using Tasklet = FunctionRunnerTasklet<Fn>;
auto tasklet = std::make_shared<Tasklet>(numInvocations, fn);
this->dispatch(tasklet);
return tasklet;
}
/*!
* \brief Make sure that all tasklets have been completed after this method has been called
*/
void barrier();
private:
// Atomic flag that is set to failure if any of the tasklets run by the TaskletRunner fails.
// This flag is checked before new tasklets run or get dispatched and in case it is true, the
// thread execution will be stopped / no new tasklets will be started and the program will abort.
// To set the flag and load the flag, we use std::memory_order_relaxed.
// Atomic operations tagged memory_order_relaxed are not synchronization operations; they do not
// impose an order among concurrent memory accesses. They guarantee atomicity and modification order
// consistency. This is the right choice for the setting here, since it is enough to broadcast failure
// before new run or get dispatched.
std::atomic<bool> failureFlag_ = false;
protected:
// main function of the worker thread
static void startWorkerThread_(TaskletRunner* taskletRunner, int workerThreadIndex);
//! do the work until the queue received an end tasklet
void run_();
std::vector<std::unique_ptr<std::thread> > threads_;
std::queue<std::shared_ptr<TaskletInterface> > taskletQueue_;
std::mutex taskletQueueMutex_;
std::condition_variable workAvailableCondition_;
static thread_local TaskletRunner* taskletRunner_;
static thread_local int workerThreadIndex_;
};
} // end namespace Opm
#endif // OPM_TASKLETS_HPP

View File

@ -0,0 +1,93 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
#include <config.h>
#include <opm/models/parallel/threadmanager.hpp>
#ifdef _OPENMP
#include <omp.h>
#endif
#include <opm/models/discretization/common/fvbaseparameters.hh>
#include <opm/models/utils/parametersystem.hh>
namespace Opm {
int ThreadManager::numThreads_ = 1;
void ThreadManager::registerParameters()
{
Parameters::Register<Parameters::ThreadsPerProcess>
("The maximum number of threads to be instantiated per process "
"('-1' means 'automatic')");
}
void ThreadManager::init(bool queryCommandLineParameter)
{
if (queryCommandLineParameter) {
numThreads_ = Parameters::Get<Parameters::ThreadsPerProcess>();
// some safety checks. This is pretty ugly macro-magic, but so what?
#if !defined(_OPENMP)
if (numThreads_ != 1 && numThreads_ != -1) {
throw std::invalid_argument("OpenMP is not available. The only valid values for "
"threads-per-process is 1 and -1 but it is "+std::to_string(numThreads_)+"!");
}
numThreads_ = 1;
#elif !defined NDEBUG && defined DUNE_INTERFACECHECK
if (numThreads_ != 1) {
throw std::invalid_argument("You explicitly enabled Barton-Nackman interface checking in Dune. "
"The Dune implementation of this is currently incompatible with "
"thread parallelism!");
}
numThreads_ = 1;
#else
if (numThreads_ == 0) {
throw std::invalid_argument("Zero threads per process are not possible: It must be at least 1, "
"(or -1 for 'automatic')!");
}
#endif
#ifdef _OPENMP
// actually limit the number of threads
if (numThreads_ > 0) {
omp_set_num_threads(numThreads_);
}
#endif
}
#ifdef _OPENMP
// get the number of threads which are used in the end.
numThreads_ = omp_get_max_threads();
#endif
}
unsigned ThreadManager::threadId()
{
#ifdef _OPENMP
return static_cast<unsigned>(omp_get_thread_num());
#else
return 0;
#endif
}
} // namespace Opm

View File

@ -1,141 +0,0 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::ThreadManager
*/
#ifndef EWOMS_THREAD_MANAGER_HH
#define EWOMS_THREAD_MANAGER_HH
#ifdef _OPENMP
#include <omp.h>
#endif
#include <opm/models/discretization/common/fvbaseparameters.hh>
#include <opm/models/utils/parametersystem.hh>
#include <opm/models/utils/propertysystem.hh>
#include <dune/common/version.hh>
namespace Opm {
/*!
* \brief Simplifies multi-threaded capabilities.
*/
template <class TypeTag>
class ThreadManager
{
public:
enum {
#if defined(_OPENMP) || DOXYGEN
//! Specify whether OpenMP is really available or not
isFake = false
#else
isFake = true
#endif
};
/*!
* \brief Register all run-time parameters of the thread manager.
*/
static void registerParameters()
{
Parameters::Register<Parameters::ThreadsPerProcess>
("The maximum number of threads to be instantiated per process "
"('-1' means 'automatic')");
}
/*!
* \brief Initialize number of threads used thread manager.
*
* \param queryCommandLineParameter if set to true we will query ThreadsPerProcess
* and if set (disregard the environment variable OPM_NUM_THREADS).
* If false we will assume that the number of OpenMP threads is already set
* outside of this function (e.g. by OPM_NUM_THREADS or in the simulator by
* the ThreadsPerProcess parameter).
*/
static void init(bool queryCommandLineParameter = true)
{
if (queryCommandLineParameter)
{
numThreads_ = Parameters::Get<Parameters::ThreadsPerProcess>();
// some safety checks. This is pretty ugly macro-magic, but so what?
#if !defined(_OPENMP)
if (numThreads_ != 1 && numThreads_ != -1)
throw std::invalid_argument("OpenMP is not available. The only valid values for "
"threads-per-process is 1 and -1 but it is "+std::to_string(numThreads_)+"!");
numThreads_ = 1;
#elif !defined NDEBUG && defined DUNE_INTERFACECHECK
if (numThreads_ != 1)
throw std::invalid_argument("You explicitly enabled Barton-Nackman interface checking in Dune. "
"The Dune implementation of this is currently incompatible with "
"thread parallelism!");
numThreads_ = 1;
#else
if (numThreads_ == 0)
throw std::invalid_argument("Zero threads per process are not possible: It must be at least 1, "
"(or -1 for 'automatic')!");
#endif
#ifdef _OPENMP
// actually limit the number of threads
if (numThreads_ > 0)
omp_set_num_threads(numThreads_);
#endif
}
#ifdef _OPENMP
// get the number of threads which are used in the end.
numThreads_ = omp_get_max_threads();
#endif
}
/*!
* \brief Return the maximum number of threads of the current process.
*/
static unsigned maxThreads()
{ return static_cast<unsigned>(numThreads_); }
/*!
* \brief Return the index of the current OpenMP thread
*/
static unsigned threadId()
{
#ifdef _OPENMP
return static_cast<unsigned>(omp_get_thread_num());
#else
return 0;
#endif
}
private:
static int numThreads_;
};
template <class TypeTag>
int ThreadManager<TypeTag>::numThreads_ = 1;
} // namespace Opm
#endif

View File

@ -0,0 +1,80 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
* \copydoc Opm::ThreadManager
*/
#ifndef OPM_THREAD_MANAGER_HPP
#define OPM_THREAD_MANAGER_HPP
namespace Opm {
/*!
* \brief Simplifies multi-threaded capabilities.
*/
class ThreadManager
{
public:
enum {
#if defined(_OPENMP) || DOXYGEN
//! Specify whether OpenMP is really available or not
isFake = false
#else
isFake = true
#endif
};
/*!
* \brief Register all run-time parameters of the thread manager.
*/
static void registerParameters();
/*!
* \brief Initialize number of threads used thread manager.
*
* \param queryCommandLineParameter if set to true we will query ThreadsPerProcess
* and if set (disregard the environment variable OPM_NUM_THREADS).
* If false we will assume that the number of OpenMP threads is already set
* outside of this function (e.g. by OPM_NUM_THREADS or in the simulator by
* the ThreadsPerProcess parameter).
*/
static void init(bool queryCommandLineParameter = true);
/*!
* \brief Return the maximum number of threads of the current process.
*/
static unsigned maxThreads()
{ return static_cast<unsigned>(numThreads_); }
/*!
* \brief Return the index of the current OpenMP thread
*/
static unsigned threadId();
private:
static int numThreads_;
};
} // namespace Opm
#endif // OPM_THREAD_MANAGER_HPP

View File

@ -33,9 +33,9 @@
#include <opm/models/utils/basicproperties.hh>
#include <opm/models/utils/propertysystem.hh>
#include <opm/models/utils/timer.hh>
#include <opm/models/utils/timer.hpp>
#include <opm/models/utils/timerguard.hh>
#include <opm/models/parallel/mpiutil.hh>
#include <opm/models/parallel/mpiutil.hpp>
#include <opm/models/discretization/common/fvbaseproperties.hh>
#include <dune/common/parallel/mpihelper.hh>

View File

@ -35,7 +35,7 @@
#include "parametersystem.hh"
#include <opm/models/utils/simulator.hh>
#include <opm/models/utils/timer.hh>
#include <opm/models/utils/timer.hpp>
#include <opm/material/common/Valgrind.hpp>
@ -76,7 +76,7 @@ template <class TypeTag>
static inline void registerAllParameters_(bool finalizeRegistration = true)
{
using Simulator = GetPropType<TypeTag, Properties::Simulator>;
using ThreadManager = GetPropType<TypeTag, Properties::ThreadManager>;
using TM = GetPropType<TypeTag, Properties::ThreadManager>;
Parameters::Register<Parameters::ParameterFile>
("An .ini file which contains a set of run-time parameters");
@ -84,7 +84,7 @@ static inline void registerAllParameters_(bool finalizeRegistration = true)
("Print the values of the run-time parameters at the "
"start of the simulation");
ThreadManager::registerParameters();
TM::registerParameters();
Simulator::registerParameters();
if (finalizeRegistration) {
@ -279,7 +279,7 @@ static inline int start(int argc, char **argv, bool registerParams=true)
using Scalar = GetPropType<TypeTag, Properties::Scalar>;
using Simulator = GetPropType<TypeTag, Properties::Simulator>;
using Problem = GetPropType<TypeTag, Properties::Problem>;
using ThreadManager = GetPropType<TypeTag, Properties::ThreadManager>;
using TM = GetPropType<TypeTag, Properties::ThreadManager>;
// set the signal handlers to reset the TTY to a well defined state on unexpected
// program aborts
@ -304,7 +304,7 @@ static inline int start(int argc, char **argv, bool registerParams=true)
if (paramStatus == 2)
return 0;
ThreadManager::init();
TM::init();
// initialize MPI, finalize is done automatically on exit
#if HAVE_DUNE_FEM

148
opm/models/utils/timer.cpp Normal file
View File

@ -0,0 +1,148 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
#include <config.h>
#include <opm/models/utils/timer.hpp>
#if HAVE_MPI
#include <mpi.h>
#endif
namespace Opm {
void Timer::TimeData::measure()
{
// Note: On Linux -- or rather fully POSIX compliant systems -- using
// clock_gettime() would be more accurate for the CPU time.
realtimeData = std::chrono::high_resolution_clock::now();
cputimeData = std::clock();
}
Timer::Timer()
{
halt();
}
void Timer::start()
{
isStopped_ = false;
startTime_.measure();
}
double Timer::stop()
{
if (!isStopped_) {
TimeData stopTime;
stopTime.measure();
const auto& t1 = startTime_.realtimeData;
const auto& t2 = stopTime.realtimeData;
std::chrono::duration<double> dt =
std::chrono::duration_cast<std::chrono::duration<double> >(t2 - t1);
realTimeElapsed_ += dt.count();
cpuTimeElapsed_ +=
static_cast<double>(stopTime.cputimeData
- startTime_.cputimeData)/CLOCKS_PER_SEC;
}
isStopped_ = true;
return realTimeElapsed_;
}
void Timer::halt()
{
isStopped_ = true;
cpuTimeElapsed_ = 0.0;
realTimeElapsed_ = 0.0;
}
void Timer::reset()
{
cpuTimeElapsed_ = 0.0;
realTimeElapsed_ = 0.0;
startTime_.measure();
}
double Timer::realTimeElapsed() const
{
if (isStopped_)
return realTimeElapsed_;
TimeData stopTime;
stopTime.measure();
const auto& t1 = startTime_.realtimeData;
const auto& t2 = stopTime.realtimeData;
std::chrono::duration<double> dt =
std::chrono::duration_cast<std::chrono::duration<double> >(t2 - t1);
return realTimeElapsed_ + dt.count();
}
double Timer::cpuTimeElapsed() const
{
if (isStopped_)
return cpuTimeElapsed_;
TimeData stopTime;
stopTime.measure();
const auto& t1 = startTime_.cputimeData;
const auto& t2 = stopTime.cputimeData;
return cpuTimeElapsed_ + static_cast<double>(t2 - t1)/CLOCKS_PER_SEC;
}
double Timer::globalCpuTimeElapsed() const
{
double val = cpuTimeElapsed();
double globalVal = val;
#if HAVE_MPI
MPI_Reduce(&val,
&globalVal,
/*count=*/1,
MPI_DOUBLE,
MPI_SUM,
/*rootRank=*/0,
MPI_COMM_WORLD);
#endif
return globalVal;
}
Timer& Timer::operator+=(const Timer& other)
{
realTimeElapsed_ += other.realTimeElapsed();
cpuTimeElapsed_ += other.cpuTimeElapsed();
return *this;
}
} // namespace Opm

View File

@ -1,217 +0,0 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
*
* \copydoc Opm::Timer
*/
#ifndef EWOMS_TIMER_HH
#define EWOMS_TIMER_HH
#include <chrono>
#if HAVE_MPI
#include <mpi.h>
#endif
namespace Opm {
/*!
* \ingroup Common
*
* \brief Provides an encapsulation to measure the system time
*
* This means the wall clock time used by the simulation, the CPU time
* used by all threads of a single process and the CPU time used by
* the overall simulation. (i.e., the time used by all threads of all
* involved processes.)
*/
class Timer
{
struct TimeData
{
std::chrono::high_resolution_clock::time_point realtimeData;
std::clock_t cputimeData;
};
public:
Timer()
{ halt(); }
/*!
* \brief Start counting the time resources used by the simulation.
*/
void start()
{
isStopped_ = false;
measure_(startTime_);
}
/*!
* \brief Stop counting the time resources.
*
* Returns the wall clock time the timer was active.
*/
double stop()
{
if (!isStopped_) {
TimeData stopTime;
measure_(stopTime);
const auto& t1 = startTime_.realtimeData;
const auto& t2 = stopTime.realtimeData;
std::chrono::duration<double> dt =
std::chrono::duration_cast<std::chrono::duration<double> >(t2 - t1);
realTimeElapsed_ += dt.count();
cpuTimeElapsed_ +=
static_cast<double>(stopTime.cputimeData
- startTime_.cputimeData)/CLOCKS_PER_SEC;
}
isStopped_ = true;
return realTimeElapsed_;
}
/*!
* \brief Stop the measurement reset all timing values
*/
void halt()
{
isStopped_ = true;
cpuTimeElapsed_ = 0.0;
realTimeElapsed_ = 0.0;
}
/*!
* \brief Make the current point in time t=0 but do not change the status of the timer.
*/
void reset()
{
cpuTimeElapsed_ = 0.0;
realTimeElapsed_ = 0.0;
measure_(startTime_);
}
/*!
* \brief Return the real time [s] elapsed during the periods the timer was active
* since the last reset.
*/
double realTimeElapsed() const
{
if (isStopped_)
return realTimeElapsed_;
TimeData stopTime;
measure_(stopTime);
const auto& t1 = startTime_.realtimeData;
const auto& t2 = stopTime.realtimeData;
std::chrono::duration<double> dt =
std::chrono::duration_cast<std::chrono::duration<double> >(t2 - t1);
return realTimeElapsed_ + dt.count();
}
/*!
* \brief This is an alias for realTimeElapsed()
*
* Its main purpose is to make the API of the class a superset of Dune::Timer
*/
double elapsed() const
{ return realTimeElapsed(); }
/*!
* \brief Return the CPU time [s] used by all threads of the local process for the
* periods the timer was active
*/
double cpuTimeElapsed() const
{
if (isStopped_)
return cpuTimeElapsed_;
TimeData stopTime;
measure_(stopTime);
const auto& t1 = startTime_.cputimeData;
const auto& t2 = stopTime.cputimeData;
return cpuTimeElapsed_ + static_cast<double>(t2 - t1)/CLOCKS_PER_SEC;
}
/*!
* \brief Return the CPU time [s] used by all threads of the all processes of program
*
* The value returned only differs from cpuTimeElapsed() if MPI is used.
*/
double globalCpuTimeElapsed() const
{
double val = cpuTimeElapsed();
double globalVal = val;
#if HAVE_MPI
MPI_Reduce(&val,
&globalVal,
/*count=*/1,
MPI_DOUBLE,
MPI_SUM,
/*rootRank=*/0,
MPI_COMM_WORLD);
#endif
return globalVal;
}
/*!
* \brief Adds the time of another timer to the current one
*/
Timer& operator+=(const Timer& other)
{
realTimeElapsed_ += other.realTimeElapsed();
cpuTimeElapsed_ += other.cpuTimeElapsed();
return *this;
}
private:
// measure the current time and put it into the object passed via
// the argument.
static void measure_(TimeData& timeData)
{
// Note: On Linux -- or rather fully POSIX compliant systems -- using
// clock_gettime() would be more accurate for the CPU time.
timeData.realtimeData = std::chrono::high_resolution_clock::now();
timeData.cputimeData = std::clock();
}
bool isStopped_;
double cpuTimeElapsed_;
double realTimeElapsed_;
TimeData startTime_;
};
} // namespace Opm
#endif

122
opm/models/utils/timer.hpp Normal file
View File

@ -0,0 +1,122 @@
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
// vi: set et ts=4 sw=4 sts=4:
/*
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
Consult the COPYING file in the top-level source directory of this
module for the precise wording of the license and the list of
copyright holders.
*/
/*!
* \file
*
* \copydoc Opm::Timer
*/
#ifndef OPM_TIMER_HPP
#define OPM_TIMER_HPP
#include <chrono>
namespace Opm {
/*!
* \ingroup Common
*
* \brief Provides an encapsulation to measure the system time
*
* This means the wall clock time used by the simulation, the CPU time
* used by all threads of a single process and the CPU time used by
* the overall simulation. (i.e., the time used by all threads of all
* involved processes.)
*/
class Timer
{
struct TimeData
{
std::chrono::high_resolution_clock::time_point realtimeData;
std::clock_t cputimeData;
// measure the current time and put it into the object.
void measure();
};
public:
Timer();
/*!
* \brief Start counting the time resources used by the simulation.
*/
void start();
/*!
* \brief Stop counting the time resources.
*
* Returns the wall clock time the timer was active.
*/
double stop();
/*!
* \brief Stop the measurement reset all timing values
*/
void halt();
/*!
* \brief Make the current point in time t=0 but do not change the status of the timer.
*/
void reset();
/*!
* \brief Return the real time [s] elapsed during the periods the timer was active
* since the last reset.
*/
double realTimeElapsed() const;
/*!
* \brief This is an alias for realTimeElapsed()
*
* Its main purpose is to make the API of the class a superset of Dune::Timer
*/
double elapsed() const
{ return realTimeElapsed(); }
/*!
* \brief Return the CPU time [s] used by all threads of the local process for the
* periods the timer was active
*/
double cpuTimeElapsed() const;
/*!
* \brief Return the CPU time [s] used by all threads of the all processes of program
*
* The value returned only differs from cpuTimeElapsed() if MPI is used.
*/
double globalCpuTimeElapsed() const;
/*!
* \brief Adds the time of another timer to the current one
*/
Timer& operator+=(const Timer& other);
private:
bool isStopped_;
double cpuTimeElapsed_;
double realTimeElapsed_;
TimeData startTime_;
};
} // namespace Opm
#endif // OPM_TIMER_HPP

View File

@ -28,7 +28,7 @@
#ifndef EWOMS_TIMER_GUARD_HH
#define EWOMS_TIMER_GUARD_HH
#include "timer.hh"
#include <opm/models/utils/timer.hpp>
namespace Opm {
/*!

View File

@ -28,7 +28,7 @@
#ifndef OPM_ECL_GENERIC_WRITER_HPP
#define OPM_ECL_GENERIC_WRITER_HPP
#include <opm/models/parallel/tasklets.hh>
#include <opm/models/parallel/tasklets.hpp>
#include <opm/simulators/flow/CollectDataOnIORank.hpp>
#include <opm/simulators/flow/Transmissibility.hpp>

View File

@ -38,6 +38,10 @@
#include <dune/common/parallel/mpihelper.hh>
#endif
#ifdef _OPENMP
#include <omp.h>
#endif
#include <charconv>
#include <cstddef>
#include <memory>
@ -97,7 +101,7 @@ namespace Opm {
("Developer option to see whether logging was on non-root processors. "
"In that case it will be appended to the *.DBG or *.PRT files");
ThreadManager<TypeTag>::registerParameters();
ThreadManager::registerParameters();
Simulator::registerParameters();
// register the base parameters
@ -305,8 +309,8 @@ namespace Opm {
omp_set_num_threads(threads);
#endif
using ThreadManager = GetPropType<TypeTag, Properties::ThreadManager>;
ThreadManager::init(false);
using TM = GetPropType<TypeTag, Properties::ThreadManager>;
TM::init(false);
}
void mergeParallelLogFiles()

View File

@ -31,7 +31,7 @@
#include "residreductioncriterion.hh"
#include "linearsolverreport.hh"
#include <opm/models/utils/timer.hh>
#include <opm/models/utils/timer.hpp>
#include <opm/models/utils/timerguard.hh>
#include <opm/common/Exceptions.hpp>

View File

@ -29,7 +29,7 @@
#include "convergencecriterion.hh"
#include <opm/models/utils/timer.hh>
#include <opm/models/utils/timer.hpp>
#include <opm/models/utils/timerguard.hh>
namespace Opm {

View File

@ -184,6 +184,8 @@ opm_add_test(test_rstconv_parallel
)
opm_add_test(test_mpiutil
DEPENDS "opmsimulators"
LIBRARIES opmsimulators
CONDITION
MPI_FOUND AND Boost_UNIT_TEST_FRAMEWORK_FOUND
SOURCES

View File

@ -19,7 +19,7 @@
#include <config.h>
#include <opm/models/parallel/mpiutil.hh>
#include <opm/models/parallel/mpiutil.hpp>
#include <dune/common/parallel/mpihelper.hh>
#include <cassert>

View File

@ -28,8 +28,9 @@
*/
#include "config.h"
#include <opm/models/parallel/tasklets.hh>
#include <opm/models/parallel/tasklets.hpp>
#include <cassert>
#include <chrono>
#include <iostream>

View File

@ -39,7 +39,7 @@
#include <cassert>
#include "config.h"
#include <opm/models/parallel/tasklets.hh>
#include <opm/models/parallel/tasklets.hpp>
std::mutex outputMutex;

View File

@ -527,7 +527,7 @@ struct AquiferFixture {
"test_RestartSerialization",
"--ecl-deck-file-name=GLIFT1.DATA"
};
Opm::ThreadManager<TT>::registerParameters();
Opm::ThreadManager::registerParameters();
AdaptiveTimeStepping<TT>::registerParameters();
BlackoilModelParameters<double>::registerParameters();
Parameters::Register<Parameters::EnableTerminalOutput>("Do *NOT* use!");

View File

@ -232,7 +232,7 @@ struct EquilFixture {
#endif
using namespace Opm;
FlowGenericVanguard::setCommunication(std::make_unique<Opm::Parallel::Communication>());
Opm::ThreadManager<TypeTag>::registerParameters();
Opm::ThreadManager::registerParameters();
BlackoilModelParameters<double>::registerParameters();
AdaptiveTimeStepping<TypeTag>::registerParameters();
Parameters::Register<Parameters::EnableTerminalOutput>("Dummy added for the well model to compile.");

View File

@ -116,7 +116,7 @@ BOOST_FIXTURE_TEST_CASE(WithOutputDir, Fixture)
Opm::Parameters::reset();
Opm::ThreadManager<int>::registerParameters();
Opm::ThreadManager::registerParameters();
Opm::Main main(3, const_cast<char**>(no_param), false);
BOOST_CHECK_EQUAL(main.justInitialize(), EXIT_SUCCESS);
@ -154,7 +154,7 @@ BOOST_FIXTURE_TEST_CASE(NoOutputDir, Fixture)
const char* no_param[] = {"test_outputdir", input_file_path.c_str(), nullptr};
Opm::Parameters::reset();
Opm::ThreadManager<int>::registerParameters();
Opm::ThreadManager::registerParameters();
Opm::Main main(2, const_cast<char**>(no_param), false);