[bugfix] 1) create asyncOutput object on all ranks to avoid deadlock in

MPI_Bcast call in writeTimeStepWithCellProperties.
2) ThreadHandle waits on destruction until all objects have been dealt with.
This commit is contained in:
Robert Kloefkorn 2017-05-29 18:08:18 +02:00
parent a61d8ab14a
commit e46810d2dc
2 changed files with 60 additions and 29 deletions

View File

@ -372,22 +372,6 @@ namespace Opm
// Ensure that output dir exists
ensureDirectoryExists(outputDir_);
// create output thread if enabled and rank is I/O rank
// async output is enabled by default if pthread are enabled
#if HAVE_PTHREAD
const bool asyncOutputDefault = false;
#else
const bool asyncOutputDefault = false;
#endif
if( param.getDefault("async_output", asyncOutputDefault ) )
{
#if HAVE_PTHREAD
asyncOutput_.reset( new ThreadHandle() );
#else
OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output");
#endif
}
std::string backupfilename = param.getDefault("backupfile", std::string("") );
if( ! backupfilename.empty() )
{
@ -395,6 +379,23 @@ namespace Opm
}
}
}
// create output thread if enabled and rank is I/O rank
// async output is enabled by default if pthread are enabled
#if HAVE_PTHREAD
const bool asyncOutputDefault = true;
#else
const bool asyncOutputDefault = false;
#endif
if( param.getDefault("async_output", asyncOutputDefault ) )
{
#if HAVE_PTHREAD
asyncOutput_.reset( new ThreadHandle( parallelOutput_->isIORank() ) );
#else
OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output");
#endif
}
}

View File

@ -53,6 +53,12 @@ namespace Opm
// no copying
ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
// wait duration of 10 milli seconds
void wait() const
{
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
}
public:
//! constructor creating object that is executed by thread
ThreadHandleQueue()
@ -60,6 +66,15 @@ namespace Opm
{
}
~ThreadHandleQueue()
{
// wait until all objects have been written.
while( ! objQueue_.empty() )
{
wait();
}
}
//! insert object into threads queue
void push_back( std::unique_ptr< ObjectInterface >&& obj )
{
@ -76,7 +91,7 @@ namespace Opm
while( objQueue_.empty() )
{
// sleep one second
std::this_thread::sleep_for( std::chrono::seconds(1) );
wait();
}
{
@ -119,38 +134,53 @@ namespace Opm
}
ThreadHandleQueue threadObjectQueue_;
std::thread thread_;
std::unique_ptr< std::thread > thread_;
private:
// prohibit copying
ThreadHandle( const ThreadHandle& ) = delete;
public:
//! default constructor
ThreadHandle()
//! constructor creating ThreadHandle
//! \param isIORank if true thread is created
ThreadHandle( const bool createThread )
: threadObjectQueue_(),
thread_( startThread, &threadObjectQueue_ )
thread_()
{
// detach thread into nirvana
thread_.detach();
if( createThread )
{
thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) );
// detach thread into nirvana
thread_->detach();
}
} // end constructor
//! dispatch object to queue of separate thread
template <class Object>
void dispatch( Object&& obj )
{
typedef ObjectWrapper< Object > ObjectPointer;
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
if( thread_ )
{
typedef ObjectWrapper< Object > ObjectPointer;
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
// add object to queue of objects
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
// add object to queue of objects
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
}
else
{
OPM_THROW(std::logic_error,"ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)");
}
}
//! destructor terminating the thread
~ThreadHandle()
{
// dispatch end object which will terminate the thread
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
if( thread_ )
{
// dispatch end object which will terminate the thread
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
}
}
};