From 54ea243c5fe216a32eb18d523ddd77564f3a1ab9 Mon Sep 17 00:00:00 2001 From: Robert Kloefkorn Date: Fri, 1 Apr 2016 15:28:14 +0200 Subject: [PATCH 1/4] SimulatorFullyImplicitBlackoilOutput: added threaded asynchronous output. --- CMakeLists_files.cmake | 7 +- .../SimulatorFullyImplicitBlackoilOutput.cpp | 145 +++++++++++------- .../SimulatorFullyImplicitBlackoilOutput.hpp | 21 ++- opm/autodiff/ThreadHandle.hpp | 142 +++++++++++++++++ 4 files changed, 258 insertions(+), 57 deletions(-) create mode 100644 opm/autodiff/ThreadHandle.hpp diff --git a/CMakeLists_files.cmake b/CMakeLists_files.cmake index 862359eb5..c38ee732a 100644 --- a/CMakeLists_files.cmake +++ b/CMakeLists_files.cmake @@ -47,9 +47,10 @@ list (APPEND MAIN_SOURCE_FILES opm/autodiff/VFPProdProperties.cpp opm/autodiff/VFPInjProperties.cpp opm/autodiff/WellMultiSegment.cpp - opm/autodiff/BlackoilSolventState.cpp - opm/polymer/PolymerState.cpp - opm/polymer/PolymerBlackoilState.cpp + opm//autodiff/ThreadHandle.hpp + opm/autodiff/BlackoilSolventState.cpp + opm/polymer/PolymerState.cpp + opm/polymer/PolymerBlackoilState.cpp opm/polymer/CompressibleTpfaPolymer.cpp opm/polymer/IncompTpfaPolymer.cpp opm/polymer/PolymerInflow.cpp diff --git a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp index adc8485f8..762143dc3 100644 --- a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp +++ b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp @@ -1,6 +1,6 @@ /* Copyright (c) 2014 SINTEF ICT, Applied Mathematics. - Copyright (c) 2015 IRIS AS + Copyright (c) 2015-2016 IRIS AS This file is part of the Open Porous Media project (OPM). @@ -38,6 +38,7 @@ #include #include #include +#include #include @@ -253,6 +254,40 @@ namespace Opm } } + + namespace detail { + + struct WriterCall : public ThreadHandle :: ObjectIF + { + BlackoilOutputWriter& writer_; + std::unique_ptr< SimulatorTimerInterface > timer_; + const SimulationDataContainer state_; + const WellState wellState_; + const bool substep_; + + explicit WriterCall( BlackoilOutputWriter& writer, + const SimulatorTimerInterface& timer, + const SimulationDataContainer& state, + const WellState& wellState, + bool substep ) + : writer_( writer ), + timer_( timer.clone() ), + state_( state ), + wellState_( wellState ), + substep_( substep ) + { + } + + // callback to writer's serial writeTimeStep method + void run () + { + // write data + writer_.writeTimeStepSerial( *timer_, state_, wellState_, substep_ ); + } + }; + } + + void BlackoilOutputWriter:: writeTimeStep(const SimulatorTimerInterface& timer, @@ -265,7 +300,7 @@ namespace Opm vtkWriter_->writeTimeStep( timer, localState, localWellState, false ); } - bool isIORank = true ; + bool isIORank = output_ ; if( parallelOutput_ && parallelOutput_->isParallel() ) { // collect all solutions to I/O rank @@ -275,65 +310,71 @@ namespace Opm const SimulationDataContainer& state = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalReservoirState() : localState; const WellState& wellState = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalWellState() : localWellState; - // output is only done on I/O rank + // serial output is only done on I/O rank if( isIORank ) { - // Matlab output - if( matlabWriter_ ) { - matlabWriter_->writeTimeStep( timer, state, wellState, substep ); + if( asyncOutput_ ) { + // dispatch the write call to the extra thread + asyncOutput_->dispatch( new detail::WriterCall( *this, timer, state, wellState, substep ) ); } - // ECL output - if ( eclWriter_ ) { - const auto initConfig = eclipseState_->getInitConfig(); - if (initConfig->getRestartInitiated() && ((initConfig->getRestartStep()) == (timer.currentStepNum()))) { - std::cout << "Skipping restart write in start of step " << timer.currentStepNum() << std::endl; - } else { - eclWriter_->writeTimeStep(timer, state, wellState, substep ); - } + else { + // just write the data to disk + writeTimeStepSerial( timer, state, wellState, substep ); } + } + } - // write backup file - if( backupfile_ ) + void + BlackoilOutputWriter:: + writeTimeStepSerial(const SimulatorTimerInterface& timer, + const SimulationDataContainer& state, + const WellState& wellState, + bool substep) + { + // Matlab output + if( matlabWriter_ ) { + matlabWriter_->writeTimeStep( timer, state, wellState, substep ); + } + + // ECL output + if ( eclWriter_ ) + { + const auto initConfig = eclipseState_->getInitConfig(); + if (initConfig->getRestartInitiated() && ((initConfig->getRestartStep()) == (timer.currentStepNum()))) { + std::cout << "Skipping restart write in start of step " << timer.currentStepNum() << std::endl; + } else { + eclWriter_->writeTimeStep(timer, state, wellState, substep ); + } + } + + // write backup file + if( backupfile_.is_open() ) + { + int reportStep = timer.reportStepNum(); + int currentTimeStep = timer.currentStepNum(); + if( (reportStep == currentTimeStep || // true for SimulatorTimer + currentTimeStep == 0 || // true for AdaptiveSimulatorTimer at reportStep + timer.done() ) // true for AdaptiveSimulatorTimer at reportStep + && lastBackupReportStep_ != reportStep ) // only backup report step once { - int reportStep = timer.reportStepNum(); - int currentTimeStep = timer.currentStepNum(); - if( (reportStep == currentTimeStep || // true for SimulatorTimer - currentTimeStep == 0 || // true for AdaptiveSimulatorTimer at reportStep - timer.done() ) // true for AdaptiveSimulatorTimer at reportStep - && lastBackupReportStep_ != reportStep ) // only backup report step once - { - // store report step - lastBackupReportStep_ = reportStep; - // write resport step number - backupfile_.write( (const char *) &reportStep, sizeof(int) ); + // store report step + lastBackupReportStep_ = reportStep; + // write resport step number + backupfile_.write( (const char *) &reportStep, sizeof(int) ); - /* - try { - const BlackoilState& boState = dynamic_cast< const BlackoilState& > (state); - backupfile_ << boState; + try { + backupfile_ << state; - const WellStateFullyImplicitBlackoil& boWellState = static_cast< const WellStateFullyImplicitBlackoil& > (wellState); - backupfile_ << boWellState; - } - catch ( const std::bad_cast& e ) - { - - } - */ - - /* - const WellStateFullyImplicitBlackoil* boWellState = - dynamic_cast< const WellStateFullyImplicitBlackoil* > (&wellState); - if( boWellState ) { - backupfile_ << (*boWellState); - } - else - OPM_THROW(std::logic_error,"cast to WellStateFullyImplicitBlackoil failed"); - */ - backupfile_ << std::flush; + const WellStateFullyImplicitBlackoil& boWellState = static_cast< const WellStateFullyImplicitBlackoil& > (wellState); + backupfile_ << boWellState; } - } // end backup - } // end isIORank + catch ( const std::bad_cast& e ) + { + } + + backupfile_ << std::flush; + } + } // end backup } void diff --git a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp index 122b12eaf..6b66bb66d 100644 --- a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp +++ b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp @@ -34,6 +34,7 @@ #include #include +#include #include #include @@ -43,6 +44,7 @@ #include #include #include +#include #include @@ -85,7 +87,7 @@ namespace Opm Opm::DataMap dm; dm["saturation"] = &state.saturation(); dm["pressure"] = &state.pressure(); - for (const auto& pair : state.cellData()) + for (const auto& pair : state.cellData()) { const std::string& name = pair.first; std::string key; @@ -220,6 +222,12 @@ namespace Opm const Opm::WellState& wellState, bool substep = false); + /** \copydoc Opm::OutputWriter::writeTimeStep */ + void writeTimeStepSerial(const SimulatorTimerInterface& timer, + const SimulationDataContainer& reservoirState, + const Opm::WellState& wellState, + bool substep); + /** \brief return output directory */ const std::string& outputDirectory() const { return outputDir_; } @@ -257,6 +265,8 @@ namespace Opm std::unique_ptr< OutputWriter > matlabWriter_; std::unique_ptr< EclipseWriter > eclWriter_; EclipseStateConstPtr eclipseState_; + + std::unique_ptr< ThreadHandle > asyncOutput_; }; @@ -289,8 +299,15 @@ namespace Opm parallelOutput_->numCells(), parallelOutput_->globalCell() ) : 0 ), - eclipseState_(eclipseState) + eclipseState_(eclipseState), + asyncOutput_() { + // create output thread if needed + if( output_ && param.getDefault("async_output", bool( false ) ) ) + { + asyncOutput_.reset( new ThreadHandle() ); + } + // For output. if (output_ && parallelOutput_->isIORank() ) { // Ensure that output dir exists diff --git a/opm/autodiff/ThreadHandle.hpp b/opm/autodiff/ThreadHandle.hpp new file mode 100644 index 000000000..5a838e02b --- /dev/null +++ b/opm/autodiff/ThreadHandle.hpp @@ -0,0 +1,142 @@ +#ifndef OPM_THREADHANDLE_HPP +#define OPM_THREADHANDLE_HPP + +#include +#include + +#include +#include +#include + +namespace Opm +{ + + class ThreadHandle + { + public: + class ObjectIF + { + protected: + ObjectIF() {} + public: + virtual ~ObjectIF() {} + virtual void run() = 0; + virtual bool isEndMarker () const { return false; } + }; + + protected: + class EndObject : public ObjectIF + { + public: + void run () { } + bool isEndMarker () const { return true; } + }; + + //////////////////////////////////////////// + // class ThreadHandleObject + //////////////////////////////////////////// + class ThreadHandleObject + { + std::queue< ObjectIF* > objPtr_; + std::mutex mutex_; + + // no copying + ThreadHandleObject( const ThreadHandleObject& ); + + public: + // constructor creating thread with given thread number + ThreadHandleObject() + : objPtr_(), mutex_() + { + } + + //! insert object into queue + void push_back( ObjectIF* obj ) + { + // lock mutex to make sure objPtr is not used + mutex_.lock(); + objPtr_.emplace( obj ); + mutex_.unlock(); + } + + //! return 1 of thread is stoped, 0 otherwise + int stoped() const + { + return ( objPtr_.empty() ) ? 1 : 0; + } + + // do the work + void run() + { + while( objPtr_.empty() ) + { + sleep( 1 ); + } + + { + // lock mutex for access to objPtr_ + mutex_.lock(); + + // get next object from queue + std::unique_ptr< ObjectIF > obj( objPtr_.front() ); + objPtr_.pop(); + + // unlock mutex for access to objPtr_ + mutex_.unlock(); + + // if object is end marker terminate thread + if( obj->isEndMarker() ){ + return; + } + + // execute object action + obj->run(); + } + + // keep thread running + run(); + } + }; // end ThreadHandleObject + + //////////////////////////////////////////////////// + // end ThreadHandleObject + //////////////////////////////////////////////////// + + static void startThread( ThreadHandleObject* obj ) + { + obj->run(); + } + + ThreadHandleObject threadObject_; + std::thread thread_; + + private: + // prohibit copying + ThreadHandle( const ThreadHandle& ); + + public: + // default constructor + ThreadHandle() + : threadObject_(), + thread_( startThread, &threadObject_ ) + { + // detach thread into nirvana + thread_.detach(); + } // end constructor + + //! dispatch object to separate thread + void dispatch( ObjectIF* obj ) + { + // add object to queue of objects + threadObject_.push_back( obj ) ; + } + + ~ThreadHandle() + { + // dispatch end object which will terminate the thread + threadObject_.push_back( new EndObject() ) ; + } + }; + +} // end namespace Opm +#endif From 7e6a6fb20a82437be7cb7ff06b90d9644eab3ac2 Mon Sep 17 00:00:00 2001 From: Robert Kloefkorn Date: Wed, 6 Apr 2016 15:26:47 +0200 Subject: [PATCH 2/4] ThreadHandle: cleanup and docu. --- .../SimulatorFullyImplicitBlackoilOutput.cpp | 2 +- opm/autodiff/ThreadHandle.hpp | 35 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp index 762143dc3..cf8380813 100644 --- a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp +++ b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp @@ -257,7 +257,7 @@ namespace Opm namespace detail { - struct WriterCall : public ThreadHandle :: ObjectIF + struct WriterCall : public ThreadHandle :: ObjectInterface { BlackoilOutputWriter& writer_; std::unique_ptr< SimulatorTimerInterface > timer_; diff --git a/opm/autodiff/ThreadHandle.hpp b/opm/autodiff/ThreadHandle.hpp index 5a838e02b..d0fe7f126 100644 --- a/opm/autodiff/ThreadHandle.hpp +++ b/opm/autodiff/ThreadHandle.hpp @@ -14,18 +14,18 @@ namespace Opm class ThreadHandle { public: - class ObjectIF + class ObjectInterface { protected: - ObjectIF() {} + ObjectInterface() {} public: - virtual ~ObjectIF() {} + virtual ~ObjectInterface() {} virtual void run() = 0; virtual bool isEndMarker () const { return false; } }; protected: - class EndObject : public ObjectIF + class EndObject : public ObjectInterface { public: void run () { } @@ -37,21 +37,22 @@ namespace Opm //////////////////////////////////////////// class ThreadHandleObject { - std::queue< ObjectIF* > objPtr_; + protected: + std::queue< ObjectInterface* > objPtr_; std::mutex mutex_; // no copying - ThreadHandleObject( const ThreadHandleObject& ); + ThreadHandleObject( const ThreadHandleObject& ) = delete; public: - // constructor creating thread with given thread number + // constructor creating object that is executed by thread ThreadHandleObject() : objPtr_(), mutex_() { } - //! insert object into queue - void push_back( ObjectIF* obj ) + //! insert object into threads queue + void push_back( ObjectInterface* obj ) { // lock mutex to make sure objPtr is not used mutex_.lock(); @@ -59,18 +60,14 @@ namespace Opm mutex_.unlock(); } - //! return 1 of thread is stoped, 0 otherwise - int stoped() const - { - return ( objPtr_.empty() ) ? 1 : 0; - } - // do the work void run() { + // wait until objects have been pushed to the queue while( objPtr_.empty() ) { - sleep( 1 ); + // sleep one second + std::this_thread::sleep_for( std::chrono::seconds(1) ); } { @@ -78,7 +75,7 @@ namespace Opm mutex_.lock(); // get next object from queue - std::unique_ptr< ObjectIF > obj( objPtr_.front() ); + std::unique_ptr< ObjectInterface > obj( objPtr_.front() ); objPtr_.pop(); // unlock mutex for access to objPtr_ @@ -112,7 +109,7 @@ namespace Opm private: // prohibit copying - ThreadHandle( const ThreadHandle& ); + ThreadHandle( const ThreadHandle& ) = delete; public: // default constructor @@ -125,7 +122,7 @@ namespace Opm } // end constructor //! dispatch object to separate thread - void dispatch( ObjectIF* obj ) + void dispatch( ObjectInterface* obj ) { // add object to queue of objects threadObject_.push_back( obj ) ; From ec45b5547d413ceb1f1e4096636583d273c04308 Mon Sep 17 00:00:00 2001 From: Robert Kloefkorn Date: Wed, 6 Apr 2016 15:39:57 +0200 Subject: [PATCH 3/4] cleanup. --- CMakeLists_files.cmake | 2 +- opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/CMakeLists_files.cmake b/CMakeLists_files.cmake index c38ee732a..e3c1e6933 100644 --- a/CMakeLists_files.cmake +++ b/CMakeLists_files.cmake @@ -47,7 +47,7 @@ list (APPEND MAIN_SOURCE_FILES opm/autodiff/VFPProdProperties.cpp opm/autodiff/VFPInjProperties.cpp opm/autodiff/WellMultiSegment.cpp - opm//autodiff/ThreadHandle.hpp + opm/autodiff/ThreadHandle.hpp opm/autodiff/BlackoilSolventState.cpp opm/polymer/PolymerState.cpp opm/polymer/PolymerBlackoilState.cpp diff --git a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp index cf8380813..6033aa631 100644 --- a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp +++ b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp @@ -38,7 +38,6 @@ #include #include #include -#include #include From b3a313bfecb46c49c3c4aaccea5b18eb8af5a710 Mon Sep 17 00:00:00 2001 From: Robert Kloefkorn Date: Fri, 8 Apr 2016 11:12:09 +0200 Subject: [PATCH 4/4] ThreadHandle: added documentation and use std::unique_ptr to store object pointers. --- .../SimulatorFullyImplicitBlackoilOutput.cpp | 2 +- opm/autodiff/ThreadHandle.hpp | 71 ++++++++++++------- 2 files changed, 46 insertions(+), 27 deletions(-) diff --git a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp index 6033aa631..26caea29c 100644 --- a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp +++ b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.cpp @@ -314,7 +314,7 @@ namespace Opm { if( asyncOutput_ ) { // dispatch the write call to the extra thread - asyncOutput_->dispatch( new detail::WriterCall( *this, timer, state, wellState, substep ) ); + asyncOutput_->dispatch( detail::WriterCall( *this, timer, state, wellState, substep ) ); } else { // just write the data to disk diff --git a/opm/autodiff/ThreadHandle.hpp b/opm/autodiff/ThreadHandle.hpp index d0fe7f126..2888e054a 100644 --- a/opm/autodiff/ThreadHandle.hpp +++ b/opm/autodiff/ThreadHandle.hpp @@ -24,6 +24,15 @@ namespace Opm virtual bool isEndMarker () const { return false; } }; + template + class ObjectWrapper : public ObjectInterface + { + Object obj_; + public: + ObjectWrapper( Object&& obj ) : obj_( std::move( obj ) ) {} + void run() { obj_.run(); } + }; + protected: class EndObject : public ObjectInterface { @@ -33,56 +42,60 @@ namespace Opm }; //////////////////////////////////////////// - // class ThreadHandleObject + // class ThreadHandleQueue //////////////////////////////////////////// - class ThreadHandleObject + class ThreadHandleQueue { protected: - std::queue< ObjectInterface* > objPtr_; + std::queue< std::unique_ptr< ObjectInterface > > objQueue_; std::mutex mutex_; // no copying - ThreadHandleObject( const ThreadHandleObject& ) = delete; + ThreadHandleQueue( const ThreadHandleQueue& ) = delete; public: - // constructor creating object that is executed by thread - ThreadHandleObject() - : objPtr_(), mutex_() + //! constructor creating object that is executed by thread + ThreadHandleQueue() + : objQueue_(), mutex_() { } //! insert object into threads queue - void push_back( ObjectInterface* obj ) + void push_back( std::unique_ptr< ObjectInterface >&& obj ) { // lock mutex to make sure objPtr is not used mutex_.lock(); - objPtr_.emplace( obj ); + objQueue_.emplace( std::move(obj) ); mutex_.unlock(); } - // do the work + //! do the work until the queue received an end object void run() { // wait until objects have been pushed to the queue - while( objPtr_.empty() ) + while( objQueue_.empty() ) { // sleep one second std::this_thread::sleep_for( std::chrono::seconds(1) ); } { - // lock mutex for access to objPtr_ + // lock mutex for access to objQueue_ mutex_.lock(); // get next object from queue - std::unique_ptr< ObjectInterface > obj( objPtr_.front() ); - objPtr_.pop(); + std::unique_ptr< ObjectInterface > obj( objQueue_.front().release() ); + // remove object from queue + objQueue_.pop(); - // unlock mutex for access to objPtr_ + // unlock mutex for access to objQueue_ mutex_.unlock(); // if object is end marker terminate thread if( obj->isEndMarker() ){ + if( ! objQueue_.empty() ) { + OPM_THROW(std::logic_error,"ThreadHandleQueue: not all queued objects were executed"); + } return; } @@ -93,18 +106,19 @@ namespace Opm // keep thread running run(); } - }; // end ThreadHandleObject + }; // end ThreadHandleQueue //////////////////////////////////////////////////// - // end ThreadHandleObject + // end ThreadHandleQueue //////////////////////////////////////////////////// - static void startThread( ThreadHandleObject* obj ) + // start the thread by calling method run + static void startThread( ThreadHandleQueue* obj ) { obj->run(); } - ThreadHandleObject threadObject_; + ThreadHandleQueue threadObjectQueue_; std::thread thread_; private: @@ -112,26 +126,31 @@ namespace Opm ThreadHandle( const ThreadHandle& ) = delete; public: - // default constructor + //! default constructor ThreadHandle() - : threadObject_(), - thread_( startThread, &threadObject_ ) + : threadObjectQueue_(), + thread_( startThread, &threadObjectQueue_ ) { // detach thread into nirvana thread_.detach(); } // end constructor - //! dispatch object to separate thread - void dispatch( ObjectInterface* obj ) + //! dispatch object to queue of separate thread + template + void dispatch( Object&& obj ) { + typedef ObjectWrapper< Object > ObjectPointer; + ObjectInterface* objPtr = new ObjectPointer( std::move(obj) ); + // add object to queue of objects - threadObject_.push_back( obj ) ; + threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) ); } + //! destructor terminating the thread ~ThreadHandle() { // dispatch end object which will terminate the thread - threadObject_.push_back( new EndObject() ) ; + threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ; } };