From e46810d2dc75cdd2527b142ecf04b61c859aee86 Mon Sep 17 00:00:00 2001 From: Robert Kloefkorn Date: Mon, 29 May 2017 18:08:18 +0200 Subject: [PATCH] [bugfix] 1) create asyncOutput object on all ranks to avoid deadlock in MPI_Bcast call in writeTimeStepWithCellProperties. 2) ThreadHandle waits on destruction until all objects have been dealt with. --- .../SimulatorFullyImplicitBlackoilOutput.hpp | 33 +++++------ opm/autodiff/ThreadHandle.hpp | 56 ++++++++++++++----- 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp index 91b89c72f..871bc9e98 100644 --- a/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp +++ b/opm/autodiff/SimulatorFullyImplicitBlackoilOutput.hpp @@ -372,22 +372,6 @@ namespace Opm // Ensure that output dir exists ensureDirectoryExists(outputDir_); - // create output thread if enabled and rank is I/O rank - // async output is enabled by default if pthread are enabled -#if HAVE_PTHREAD - const bool asyncOutputDefault = false; -#else - const bool asyncOutputDefault = false; -#endif - if( param.getDefault("async_output", asyncOutputDefault ) ) - { -#if HAVE_PTHREAD - asyncOutput_.reset( new ThreadHandle() ); -#else - OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output"); -#endif - } - std::string backupfilename = param.getDefault("backupfile", std::string("") ); if( ! backupfilename.empty() ) { @@ -395,6 +379,23 @@ namespace Opm } } } + + // create output thread if enabled and rank is I/O rank + // async output is enabled by default if pthread are enabled +#if HAVE_PTHREAD + const bool asyncOutputDefault = true; +#else + const bool asyncOutputDefault = false; +#endif + if( param.getDefault("async_output", asyncOutputDefault ) ) + { +#if HAVE_PTHREAD + asyncOutput_.reset( new ThreadHandle( parallelOutput_->isIORank() ) ); +#else + OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output"); +#endif + } + } diff --git a/opm/autodiff/ThreadHandle.hpp b/opm/autodiff/ThreadHandle.hpp index 2888e054a..d0eda062a 100644 --- a/opm/autodiff/ThreadHandle.hpp +++ b/opm/autodiff/ThreadHandle.hpp @@ -53,6 +53,12 @@ namespace Opm // no copying ThreadHandleQueue( const ThreadHandleQueue& ) = delete; + // wait duration of 10 milli seconds + void wait() const + { + std::this_thread::sleep_for( std::chrono::milliseconds(10) ); + } + public: //! constructor creating object that is executed by thread ThreadHandleQueue() @@ -60,6 +66,15 @@ namespace Opm { } + ~ThreadHandleQueue() + { + // wait until all objects have been written. + while( ! objQueue_.empty() ) + { + wait(); + } + } + //! insert object into threads queue void push_back( std::unique_ptr< ObjectInterface >&& obj ) { @@ -76,7 +91,7 @@ namespace Opm while( objQueue_.empty() ) { // sleep one second - std::this_thread::sleep_for( std::chrono::seconds(1) ); + wait(); } { @@ -119,38 +134,53 @@ namespace Opm } ThreadHandleQueue threadObjectQueue_; - std::thread thread_; + std::unique_ptr< std::thread > thread_; private: // prohibit copying ThreadHandle( const ThreadHandle& ) = delete; public: - //! default constructor - ThreadHandle() + //! constructor creating ThreadHandle + //! \param isIORank if true thread is created + ThreadHandle( const bool createThread ) : threadObjectQueue_(), - thread_( startThread, &threadObjectQueue_ ) + thread_() { - // detach thread into nirvana - thread_.detach(); + if( createThread ) + { + thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) ); + // detach thread into nirvana + thread_->detach(); + } } // end constructor //! dispatch object to queue of separate thread template void dispatch( Object&& obj ) { - typedef ObjectWrapper< Object > ObjectPointer; - ObjectInterface* objPtr = new ObjectPointer( std::move(obj) ); + if( thread_ ) + { + typedef ObjectWrapper< Object > ObjectPointer; + ObjectInterface* objPtr = new ObjectPointer( std::move(obj) ); - // add object to queue of objects - threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) ); + // add object to queue of objects + threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) ); + } + else + { + OPM_THROW(std::logic_error,"ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)"); + } } //! destructor terminating the thread ~ThreadHandle() { - // dispatch end object which will terminate the thread - threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ; + if( thread_ ) + { + // dispatch end object which will terminate the thread + threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ; + } } };