Merge pull request #1193 from dr-robertk/PR/fix-mpi-deadlock-in-output

Bugfix: fix MPI deadlock in output when asyncOutput is enabled.
This commit is contained in:
Markus Blatt 2017-05-30 11:33:54 +02:00 committed by GitHub
commit 138eb2c91b
2 changed files with 60 additions and 29 deletions

View File

@ -372,22 +372,6 @@ namespace Opm
// Ensure that output dir exists // Ensure that output dir exists
ensureDirectoryExists(outputDir_); ensureDirectoryExists(outputDir_);
// create output thread if enabled and rank is I/O rank
// async output is enabled by default if pthread are enabled
#if HAVE_PTHREAD
const bool asyncOutputDefault = false;
#else
const bool asyncOutputDefault = false;
#endif
if( param.getDefault("async_output", asyncOutputDefault ) )
{
#if HAVE_PTHREAD
asyncOutput_.reset( new ThreadHandle() );
#else
OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output");
#endif
}
std::string backupfilename = param.getDefault("backupfile", std::string("") ); std::string backupfilename = param.getDefault("backupfile", std::string("") );
if( ! backupfilename.empty() ) if( ! backupfilename.empty() )
{ {
@ -395,6 +379,23 @@ namespace Opm
} }
} }
} }
// create output thread if enabled and rank is I/O rank
// async output is enabled by default if pthread are enabled
#if HAVE_PTHREAD
const bool asyncOutputDefault = true;
#else
const bool asyncOutputDefault = false;
#endif
if( param.getDefault("async_output", asyncOutputDefault ) )
{
#if HAVE_PTHREAD
asyncOutput_.reset( new ThreadHandle( parallelOutput_->isIORank() ) );
#else
OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output");
#endif
}
} }

View File

@ -53,6 +53,12 @@ namespace Opm
// no copying // no copying
ThreadHandleQueue( const ThreadHandleQueue& ) = delete; ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
// wait duration of 10 milli seconds
void wait() const
{
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
}
public: public:
//! constructor creating object that is executed by thread //! constructor creating object that is executed by thread
ThreadHandleQueue() ThreadHandleQueue()
@ -60,6 +66,15 @@ namespace Opm
{ {
} }
~ThreadHandleQueue()
{
// wait until all objects have been written.
while( ! objQueue_.empty() )
{
wait();
}
}
//! insert object into threads queue //! insert object into threads queue
void push_back( std::unique_ptr< ObjectInterface >&& obj ) void push_back( std::unique_ptr< ObjectInterface >&& obj )
{ {
@ -76,7 +91,7 @@ namespace Opm
while( objQueue_.empty() ) while( objQueue_.empty() )
{ {
// sleep one second // sleep one second
std::this_thread::sleep_for( std::chrono::seconds(1) ); wait();
} }
{ {
@ -119,38 +134,53 @@ namespace Opm
} }
ThreadHandleQueue threadObjectQueue_; ThreadHandleQueue threadObjectQueue_;
std::thread thread_; std::unique_ptr< std::thread > thread_;
private: private:
// prohibit copying // prohibit copying
ThreadHandle( const ThreadHandle& ) = delete; ThreadHandle( const ThreadHandle& ) = delete;
public: public:
//! default constructor //! constructor creating ThreadHandle
ThreadHandle() //! \param isIORank if true thread is created
ThreadHandle( const bool createThread )
: threadObjectQueue_(), : threadObjectQueue_(),
thread_( startThread, &threadObjectQueue_ ) thread_()
{ {
// detach thread into nirvana if( createThread )
thread_.detach(); {
thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) );
// detach thread into nirvana
thread_->detach();
}
} // end constructor } // end constructor
//! dispatch object to queue of separate thread //! dispatch object to queue of separate thread
template <class Object> template <class Object>
void dispatch( Object&& obj ) void dispatch( Object&& obj )
{ {
typedef ObjectWrapper< Object > ObjectPointer; if( thread_ )
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) ); {
typedef ObjectWrapper< Object > ObjectPointer;
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
// add object to queue of objects // add object to queue of objects
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) ); threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
}
else
{
OPM_THROW(std::logic_error,"ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)");
}
} }
//! destructor terminating the thread //! destructor terminating the thread
~ThreadHandle() ~ThreadHandle()
{ {
// dispatch end object which will terminate the thread if( thread_ )
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ; {
// dispatch end object which will terminate the thread
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
}
} }
}; };