ThreadHandle: added documentation and use std::unique_ptr to store object pointers.

This commit is contained in:
Robert Kloefkorn 2016-04-08 11:12:09 +02:00
parent ec45b5547d
commit b3a313bfec
2 changed files with 46 additions and 27 deletions

View File

@ -314,7 +314,7 @@ namespace Opm
{ {
if( asyncOutput_ ) { if( asyncOutput_ ) {
// dispatch the write call to the extra thread // dispatch the write call to the extra thread
asyncOutput_->dispatch( new detail::WriterCall( *this, timer, state, wellState, substep ) ); asyncOutput_->dispatch( detail::WriterCall( *this, timer, state, wellState, substep ) );
} }
else { else {
// just write the data to disk // just write the data to disk

View File

@ -24,6 +24,15 @@ namespace Opm
virtual bool isEndMarker () const { return false; } virtual bool isEndMarker () const { return false; }
}; };
template <class Object>
class ObjectWrapper : public ObjectInterface
{
Object obj_;
public:
ObjectWrapper( Object&& obj ) : obj_( std::move( obj ) ) {}
void run() { obj_.run(); }
};
protected: protected:
class EndObject : public ObjectInterface class EndObject : public ObjectInterface
{ {
@ -33,56 +42,60 @@ namespace Opm
}; };
//////////////////////////////////////////// ////////////////////////////////////////////
// class ThreadHandleObject // class ThreadHandleQueue
//////////////////////////////////////////// ////////////////////////////////////////////
class ThreadHandleObject class ThreadHandleQueue
{ {
protected: protected:
std::queue< ObjectInterface* > objPtr_; std::queue< std::unique_ptr< ObjectInterface > > objQueue_;
std::mutex mutex_; std::mutex mutex_;
// no copying // no copying
ThreadHandleObject( const ThreadHandleObject& ) = delete; ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
public: public:
// constructor creating object that is executed by thread //! constructor creating object that is executed by thread
ThreadHandleObject() ThreadHandleQueue()
: objPtr_(), mutex_() : objQueue_(), mutex_()
{ {
} }
//! insert object into threads queue //! insert object into threads queue
void push_back( ObjectInterface* obj ) void push_back( std::unique_ptr< ObjectInterface >&& obj )
{ {
// lock mutex to make sure objPtr is not used // lock mutex to make sure objPtr is not used
mutex_.lock(); mutex_.lock();
objPtr_.emplace( obj ); objQueue_.emplace( std::move(obj) );
mutex_.unlock(); mutex_.unlock();
} }
// do the work //! do the work until the queue received an end object
void run() void run()
{ {
// wait until objects have been pushed to the queue // wait until objects have been pushed to the queue
while( objPtr_.empty() ) while( objQueue_.empty() )
{ {
// sleep one second // sleep one second
std::this_thread::sleep_for( std::chrono::seconds(1) ); std::this_thread::sleep_for( std::chrono::seconds(1) );
} }
{ {
// lock mutex for access to objPtr_ // lock mutex for access to objQueue_
mutex_.lock(); mutex_.lock();
// get next object from queue // get next object from queue
std::unique_ptr< ObjectInterface > obj( objPtr_.front() ); std::unique_ptr< ObjectInterface > obj( objQueue_.front().release() );
objPtr_.pop(); // remove object from queue
objQueue_.pop();
// unlock mutex for access to objPtr_ // unlock mutex for access to objQueue_
mutex_.unlock(); mutex_.unlock();
// if object is end marker terminate thread // if object is end marker terminate thread
if( obj->isEndMarker() ){ if( obj->isEndMarker() ){
if( ! objQueue_.empty() ) {
OPM_THROW(std::logic_error,"ThreadHandleQueue: not all queued objects were executed");
}
return; return;
} }
@ -93,18 +106,19 @@ namespace Opm
// keep thread running // keep thread running
run(); run();
} }
}; // end ThreadHandleObject }; // end ThreadHandleQueue
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
// end ThreadHandleObject // end ThreadHandleQueue
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
static void startThread( ThreadHandleObject* obj ) // start the thread by calling method run
static void startThread( ThreadHandleQueue* obj )
{ {
obj->run(); obj->run();
} }
ThreadHandleObject threadObject_; ThreadHandleQueue threadObjectQueue_;
std::thread thread_; std::thread thread_;
private: private:
@ -112,26 +126,31 @@ namespace Opm
ThreadHandle( const ThreadHandle& ) = delete; ThreadHandle( const ThreadHandle& ) = delete;
public: public:
// default constructor //! default constructor
ThreadHandle() ThreadHandle()
: threadObject_(), : threadObjectQueue_(),
thread_( startThread, &threadObject_ ) thread_( startThread, &threadObjectQueue_ )
{ {
// detach thread into nirvana // detach thread into nirvana
thread_.detach(); thread_.detach();
} // end constructor } // end constructor
//! dispatch object to separate thread //! dispatch object to queue of separate thread
void dispatch( ObjectInterface* obj ) template <class Object>
void dispatch( Object&& obj )
{ {
typedef ObjectWrapper< Object > ObjectPointer;
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
// add object to queue of objects // add object to queue of objects
threadObject_.push_back( obj ) ; threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
} }
//! destructor terminating the thread
~ThreadHandle() ~ThreadHandle()
{ {
// dispatch end object which will terminate the thread // dispatch end object which will terminate the thread
threadObject_.push_back( new EndObject() ) ; threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
} }
}; };