mirror of
https://github.com/OPM/opm-simulators.git
synced 2025-02-25 18:55:30 -06:00
move the old "ThreadHandle" mechanism back to opm-simulators
this class is only used by the legacy simulators, `flow` uses the `EclWriter` class provided by eWoms. In turn, this class uses the new-and-shiny "tasklet" mechanism.
This commit is contained in:
parent
79bc265de8
commit
71d353326a
@ -323,6 +323,7 @@ list (APPEND PUBLIC_HEADER_FILES
|
||||
opm/autodiff/WellStateFullyImplicitBlackoil.hpp
|
||||
opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp
|
||||
opm/autodiff/BlackoilOutputEbos.hpp
|
||||
opm/autodiff/ThreadHandle.hpp
|
||||
opm/autodiff/VFPProperties.hpp
|
||||
opm/autodiff/VFPHelpers.hpp
|
||||
opm/autodiff/VFPProdProperties.hpp
|
||||
|
@ -40,7 +40,7 @@
|
||||
#include <opm/autodiff/ParallelDebugOutput.hpp>
|
||||
|
||||
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
||||
#include <ebos/threadhandle.hh>
|
||||
#include <opm/autodiff/ThreadHandle.hpp>
|
||||
#include <opm/autodiff/AutoDiffBlock.hpp>
|
||||
|
||||
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
||||
|
220
opm/autodiff/ThreadHandle.hpp
Normal file
220
opm/autodiff/ThreadHandle.hpp
Normal file
@ -0,0 +1,220 @@
|
||||
// -*- mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
|
||||
// vi: set et ts=4 sw=4 sts=4:
|
||||
/*
|
||||
This file is part of the Open Porous Media project (OPM).
|
||||
|
||||
OPM is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OPM is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OPM. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
Consult the COPYING file in the top-level source directory of this
|
||||
module for the precise wording of the license and the list of
|
||||
copyright holders.
|
||||
*/
|
||||
|
||||
#ifndef OPM_THREADHANDLE_HPP
|
||||
#define OPM_THREADHANDLE_HPP
|
||||
|
||||
#include <cassert>
|
||||
#include <dune/common/exceptions.hh>
|
||||
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
namespace Opm
|
||||
{
|
||||
|
||||
class ThreadHandle
|
||||
{
|
||||
public:
|
||||
|
||||
/// \brief ObjectInterface class
|
||||
/// Virtual interface for code to be run in a seperate thread.
|
||||
class ObjectInterface
|
||||
{
|
||||
protected:
|
||||
ObjectInterface() {}
|
||||
public:
|
||||
virtual ~ObjectInterface() {}
|
||||
virtual void run() = 0;
|
||||
virtual bool isEndMarker () const { return false; }
|
||||
};
|
||||
|
||||
/// \brief ObjectWrapper class
|
||||
/// Implementation of virtualization of template argument fullfilling
|
||||
/// the virtual object interface.
|
||||
template <class Object>
|
||||
class ObjectWrapper : public ObjectInterface
|
||||
{
|
||||
Object obj_;
|
||||
public:
|
||||
ObjectWrapper( Object&& obj ) : obj_( std::move( obj ) ) {}
|
||||
void run() { obj_.run(); }
|
||||
};
|
||||
|
||||
protected:
|
||||
|
||||
/// \brief EndObject class
|
||||
/// Empthy object marking thread termination.
|
||||
class EndObject : public ObjectInterface
|
||||
{
|
||||
public:
|
||||
void run () { }
|
||||
bool isEndMarker () const { return true; }
|
||||
};
|
||||
|
||||
|
||||
/// \brief The ThreadHandleQueue class
|
||||
/// Queue of objects to be handled by this thread.
|
||||
class ThreadHandleQueue
|
||||
{
|
||||
public:
|
||||
//! constructor creating object that is executed by thread
|
||||
ThreadHandleQueue()
|
||||
: objQueue_(), mutex_()
|
||||
{
|
||||
}
|
||||
|
||||
~ThreadHandleQueue()
|
||||
{
|
||||
// wait until all objects have been written.
|
||||
while( ! objQueue_.empty() )
|
||||
{
|
||||
wait();
|
||||
}
|
||||
}
|
||||
|
||||
//! insert object into threads queue
|
||||
void push_back( std::unique_ptr< ObjectInterface >&& obj )
|
||||
{
|
||||
// lock mutex to make sure objPtr is not used
|
||||
mutex_.lock();
|
||||
objQueue_.emplace( std::move(obj) );
|
||||
mutex_.unlock();
|
||||
}
|
||||
|
||||
//! do the work until the queue received an end object
|
||||
void run()
|
||||
{
|
||||
// wait until objects have been pushed to the queue
|
||||
while( objQueue_.empty() )
|
||||
{
|
||||
// sleep one second
|
||||
wait();
|
||||
}
|
||||
|
||||
{
|
||||
// lock mutex for access to objQueue_
|
||||
mutex_.lock();
|
||||
|
||||
// get next object from queue
|
||||
std::unique_ptr< ObjectInterface > obj( objQueue_.front().release() );
|
||||
// remove object from queue
|
||||
objQueue_.pop();
|
||||
|
||||
// unlock mutex for access to objQueue_
|
||||
mutex_.unlock();
|
||||
|
||||
// if object is end marker terminate thread
|
||||
if( obj->isEndMarker() ){
|
||||
if( ! objQueue_.empty() ) {
|
||||
throw std::logic_error("ThreadHandleQueue: not all queued objects were executed");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// execute object action
|
||||
obj->run();
|
||||
}
|
||||
|
||||
// keep thread running
|
||||
run();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::queue< std::unique_ptr< ObjectInterface > > objQueue_;
|
||||
std::mutex mutex_;
|
||||
|
||||
// no copying
|
||||
ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
|
||||
|
||||
// wait duration of 10 milli seconds
|
||||
void wait() const
|
||||
{
|
||||
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
|
||||
}
|
||||
}; // end ThreadHandleQueue
|
||||
|
||||
////////////////////////////////////////////////////
|
||||
// end ThreadHandleQueue
|
||||
////////////////////////////////////////////////////
|
||||
|
||||
// start the thread by calling method run
|
||||
static void startThread( ThreadHandleQueue* obj )
|
||||
{
|
||||
obj->run();
|
||||
}
|
||||
|
||||
ThreadHandleQueue threadObjectQueue_;
|
||||
std::unique_ptr< std::thread > thread_;
|
||||
|
||||
private:
|
||||
// prohibit copying
|
||||
ThreadHandle( const ThreadHandle& ) = delete;
|
||||
|
||||
public:
|
||||
//! constructor creating ThreadHandle
|
||||
//! \param isIORank if true thread is created
|
||||
ThreadHandle( const bool createThread )
|
||||
: threadObjectQueue_(),
|
||||
thread_()
|
||||
{
|
||||
if( createThread )
|
||||
{
|
||||
thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) );
|
||||
// detach thread into nirvana
|
||||
thread_->detach();
|
||||
}
|
||||
} // end constructor
|
||||
|
||||
//! dispatch object to queue of separate thread
|
||||
template <class Object>
|
||||
void dispatch( Object&& obj )
|
||||
{
|
||||
if( thread_ )
|
||||
{
|
||||
typedef ObjectWrapper< Object > ObjectPointer;
|
||||
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
|
||||
|
||||
// add object to queue of objects
|
||||
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
|
||||
}
|
||||
else
|
||||
{
|
||||
throw std::logic_error("ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)");
|
||||
}
|
||||
}
|
||||
|
||||
//! destructor terminating the thread
|
||||
~ThreadHandle()
|
||||
{
|
||||
if( thread_ )
|
||||
{
|
||||
// dispatch end object which will terminate the thread
|
||||
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // end namespace Opm
|
||||
#endif
|
Loading…
Reference in New Issue
Block a user