mirror of
https://github.com/OPM/opm-simulators.git
synced 2025-02-25 18:55:30 -06:00
SimulatorFullyImplicitBlackoilOutput: added threaded asynchronous output.
This commit is contained in:
parent
b406be3839
commit
54ea243c5f
@ -47,9 +47,10 @@ list (APPEND MAIN_SOURCE_FILES
|
|||||||
opm/autodiff/VFPProdProperties.cpp
|
opm/autodiff/VFPProdProperties.cpp
|
||||||
opm/autodiff/VFPInjProperties.cpp
|
opm/autodiff/VFPInjProperties.cpp
|
||||||
opm/autodiff/WellMultiSegment.cpp
|
opm/autodiff/WellMultiSegment.cpp
|
||||||
opm/autodiff/BlackoilSolventState.cpp
|
opm//autodiff/ThreadHandle.hpp
|
||||||
opm/polymer/PolymerState.cpp
|
opm/autodiff/BlackoilSolventState.cpp
|
||||||
opm/polymer/PolymerBlackoilState.cpp
|
opm/polymer/PolymerState.cpp
|
||||||
|
opm/polymer/PolymerBlackoilState.cpp
|
||||||
opm/polymer/CompressibleTpfaPolymer.cpp
|
opm/polymer/CompressibleTpfaPolymer.cpp
|
||||||
opm/polymer/IncompTpfaPolymer.cpp
|
opm/polymer/IncompTpfaPolymer.cpp
|
||||||
opm/polymer/PolymerInflow.cpp
|
opm/polymer/PolymerInflow.cpp
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
/*
|
/*
|
||||||
Copyright (c) 2014 SINTEF ICT, Applied Mathematics.
|
Copyright (c) 2014 SINTEF ICT, Applied Mathematics.
|
||||||
Copyright (c) 2015 IRIS AS
|
Copyright (c) 2015-2016 IRIS AS
|
||||||
|
|
||||||
This file is part of the Open Porous Media project (OPM).
|
This file is part of the Open Porous Media project (OPM).
|
||||||
|
|
||||||
@ -38,6 +38,7 @@
|
|||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
|
|
||||||
@ -253,6 +254,40 @@ namespace Opm
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
namespace detail {
|
||||||
|
|
||||||
|
struct WriterCall : public ThreadHandle :: ObjectIF
|
||||||
|
{
|
||||||
|
BlackoilOutputWriter& writer_;
|
||||||
|
std::unique_ptr< SimulatorTimerInterface > timer_;
|
||||||
|
const SimulationDataContainer state_;
|
||||||
|
const WellState wellState_;
|
||||||
|
const bool substep_;
|
||||||
|
|
||||||
|
explicit WriterCall( BlackoilOutputWriter& writer,
|
||||||
|
const SimulatorTimerInterface& timer,
|
||||||
|
const SimulationDataContainer& state,
|
||||||
|
const WellState& wellState,
|
||||||
|
bool substep )
|
||||||
|
: writer_( writer ),
|
||||||
|
timer_( timer.clone() ),
|
||||||
|
state_( state ),
|
||||||
|
wellState_( wellState ),
|
||||||
|
substep_( substep )
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
// callback to writer's serial writeTimeStep method
|
||||||
|
void run ()
|
||||||
|
{
|
||||||
|
// write data
|
||||||
|
writer_.writeTimeStepSerial( *timer_, state_, wellState_, substep_ );
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
BlackoilOutputWriter::
|
BlackoilOutputWriter::
|
||||||
writeTimeStep(const SimulatorTimerInterface& timer,
|
writeTimeStep(const SimulatorTimerInterface& timer,
|
||||||
@ -265,7 +300,7 @@ namespace Opm
|
|||||||
vtkWriter_->writeTimeStep( timer, localState, localWellState, false );
|
vtkWriter_->writeTimeStep( timer, localState, localWellState, false );
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isIORank = true ;
|
bool isIORank = output_ ;
|
||||||
if( parallelOutput_ && parallelOutput_->isParallel() )
|
if( parallelOutput_ && parallelOutput_->isParallel() )
|
||||||
{
|
{
|
||||||
// collect all solutions to I/O rank
|
// collect all solutions to I/O rank
|
||||||
@ -275,65 +310,71 @@ namespace Opm
|
|||||||
const SimulationDataContainer& state = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalReservoirState() : localState;
|
const SimulationDataContainer& state = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalReservoirState() : localState;
|
||||||
const WellState& wellState = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalWellState() : localWellState;
|
const WellState& wellState = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalWellState() : localWellState;
|
||||||
|
|
||||||
// output is only done on I/O rank
|
// serial output is only done on I/O rank
|
||||||
if( isIORank )
|
if( isIORank )
|
||||||
{
|
{
|
||||||
// Matlab output
|
if( asyncOutput_ ) {
|
||||||
if( matlabWriter_ ) {
|
// dispatch the write call to the extra thread
|
||||||
matlabWriter_->writeTimeStep( timer, state, wellState, substep );
|
asyncOutput_->dispatch( new detail::WriterCall( *this, timer, state, wellState, substep ) );
|
||||||
}
|
}
|
||||||
// ECL output
|
else {
|
||||||
if ( eclWriter_ ) {
|
// just write the data to disk
|
||||||
const auto initConfig = eclipseState_->getInitConfig();
|
writeTimeStepSerial( timer, state, wellState, substep );
|
||||||
if (initConfig->getRestartInitiated() && ((initConfig->getRestartStep()) == (timer.currentStepNum()))) {
|
|
||||||
std::cout << "Skipping restart write in start of step " << timer.currentStepNum() << std::endl;
|
|
||||||
} else {
|
|
||||||
eclWriter_->writeTimeStep(timer, state, wellState, substep );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// write backup file
|
void
|
||||||
if( backupfile_ )
|
BlackoilOutputWriter::
|
||||||
|
writeTimeStepSerial(const SimulatorTimerInterface& timer,
|
||||||
|
const SimulationDataContainer& state,
|
||||||
|
const WellState& wellState,
|
||||||
|
bool substep)
|
||||||
|
{
|
||||||
|
// Matlab output
|
||||||
|
if( matlabWriter_ ) {
|
||||||
|
matlabWriter_->writeTimeStep( timer, state, wellState, substep );
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECL output
|
||||||
|
if ( eclWriter_ )
|
||||||
|
{
|
||||||
|
const auto initConfig = eclipseState_->getInitConfig();
|
||||||
|
if (initConfig->getRestartInitiated() && ((initConfig->getRestartStep()) == (timer.currentStepNum()))) {
|
||||||
|
std::cout << "Skipping restart write in start of step " << timer.currentStepNum() << std::endl;
|
||||||
|
} else {
|
||||||
|
eclWriter_->writeTimeStep(timer, state, wellState, substep );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// write backup file
|
||||||
|
if( backupfile_.is_open() )
|
||||||
|
{
|
||||||
|
int reportStep = timer.reportStepNum();
|
||||||
|
int currentTimeStep = timer.currentStepNum();
|
||||||
|
if( (reportStep == currentTimeStep || // true for SimulatorTimer
|
||||||
|
currentTimeStep == 0 || // true for AdaptiveSimulatorTimer at reportStep
|
||||||
|
timer.done() ) // true for AdaptiveSimulatorTimer at reportStep
|
||||||
|
&& lastBackupReportStep_ != reportStep ) // only backup report step once
|
||||||
{
|
{
|
||||||
int reportStep = timer.reportStepNum();
|
// store report step
|
||||||
int currentTimeStep = timer.currentStepNum();
|
lastBackupReportStep_ = reportStep;
|
||||||
if( (reportStep == currentTimeStep || // true for SimulatorTimer
|
// write resport step number
|
||||||
currentTimeStep == 0 || // true for AdaptiveSimulatorTimer at reportStep
|
backupfile_.write( (const char *) &reportStep, sizeof(int) );
|
||||||
timer.done() ) // true for AdaptiveSimulatorTimer at reportStep
|
|
||||||
&& lastBackupReportStep_ != reportStep ) // only backup report step once
|
|
||||||
{
|
|
||||||
// store report step
|
|
||||||
lastBackupReportStep_ = reportStep;
|
|
||||||
// write resport step number
|
|
||||||
backupfile_.write( (const char *) &reportStep, sizeof(int) );
|
|
||||||
|
|
||||||
/*
|
try {
|
||||||
try {
|
backupfile_ << state;
|
||||||
const BlackoilState& boState = dynamic_cast< const BlackoilState& > (state);
|
|
||||||
backupfile_ << boState;
|
|
||||||
|
|
||||||
const WellStateFullyImplicitBlackoil& boWellState = static_cast< const WellStateFullyImplicitBlackoil& > (wellState);
|
const WellStateFullyImplicitBlackoil& boWellState = static_cast< const WellStateFullyImplicitBlackoil& > (wellState);
|
||||||
backupfile_ << boWellState;
|
backupfile_ << boWellState;
|
||||||
}
|
|
||||||
catch ( const std::bad_cast& e )
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
/*
|
|
||||||
const WellStateFullyImplicitBlackoil* boWellState =
|
|
||||||
dynamic_cast< const WellStateFullyImplicitBlackoil* > (&wellState);
|
|
||||||
if( boWellState ) {
|
|
||||||
backupfile_ << (*boWellState);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
OPM_THROW(std::logic_error,"cast to WellStateFullyImplicitBlackoil failed");
|
|
||||||
*/
|
|
||||||
backupfile_ << std::flush;
|
|
||||||
}
|
}
|
||||||
} // end backup
|
catch ( const std::bad_cast& e )
|
||||||
} // end isIORank
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
backupfile_ << std::flush;
|
||||||
|
}
|
||||||
|
} // end backup
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
#include <opm/autodiff/ParallelDebugOutput.hpp>
|
#include <opm/autodiff/ParallelDebugOutput.hpp>
|
||||||
|
|
||||||
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
||||||
|
#include <opm/autodiff/ThreadHandle.hpp>
|
||||||
|
|
||||||
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
||||||
#include <opm/parser/eclipse/EclipseState/InitConfig/InitConfig.hpp>
|
#include <opm/parser/eclipse/EclipseState/InitConfig/InitConfig.hpp>
|
||||||
@ -43,6 +44,7 @@
|
|||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <iomanip>
|
#include <iomanip>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
|
|
||||||
@ -85,7 +87,7 @@ namespace Opm
|
|||||||
Opm::DataMap dm;
|
Opm::DataMap dm;
|
||||||
dm["saturation"] = &state.saturation();
|
dm["saturation"] = &state.saturation();
|
||||||
dm["pressure"] = &state.pressure();
|
dm["pressure"] = &state.pressure();
|
||||||
for (const auto& pair : state.cellData())
|
for (const auto& pair : state.cellData())
|
||||||
{
|
{
|
||||||
const std::string& name = pair.first;
|
const std::string& name = pair.first;
|
||||||
std::string key;
|
std::string key;
|
||||||
@ -220,6 +222,12 @@ namespace Opm
|
|||||||
const Opm::WellState& wellState,
|
const Opm::WellState& wellState,
|
||||||
bool substep = false);
|
bool substep = false);
|
||||||
|
|
||||||
|
/** \copydoc Opm::OutputWriter::writeTimeStep */
|
||||||
|
void writeTimeStepSerial(const SimulatorTimerInterface& timer,
|
||||||
|
const SimulationDataContainer& reservoirState,
|
||||||
|
const Opm::WellState& wellState,
|
||||||
|
bool substep);
|
||||||
|
|
||||||
/** \brief return output directory */
|
/** \brief return output directory */
|
||||||
const std::string& outputDirectory() const { return outputDir_; }
|
const std::string& outputDirectory() const { return outputDir_; }
|
||||||
|
|
||||||
@ -257,6 +265,8 @@ namespace Opm
|
|||||||
std::unique_ptr< OutputWriter > matlabWriter_;
|
std::unique_ptr< OutputWriter > matlabWriter_;
|
||||||
std::unique_ptr< EclipseWriter > eclWriter_;
|
std::unique_ptr< EclipseWriter > eclWriter_;
|
||||||
EclipseStateConstPtr eclipseState_;
|
EclipseStateConstPtr eclipseState_;
|
||||||
|
|
||||||
|
std::unique_ptr< ThreadHandle > asyncOutput_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -289,8 +299,15 @@ namespace Opm
|
|||||||
parallelOutput_->numCells(),
|
parallelOutput_->numCells(),
|
||||||
parallelOutput_->globalCell() )
|
parallelOutput_->globalCell() )
|
||||||
: 0 ),
|
: 0 ),
|
||||||
eclipseState_(eclipseState)
|
eclipseState_(eclipseState),
|
||||||
|
asyncOutput_()
|
||||||
{
|
{
|
||||||
|
// create output thread if needed
|
||||||
|
if( output_ && param.getDefault("async_output", bool( false ) ) )
|
||||||
|
{
|
||||||
|
asyncOutput_.reset( new ThreadHandle() );
|
||||||
|
}
|
||||||
|
|
||||||
// For output.
|
// For output.
|
||||||
if (output_ && parallelOutput_->isIORank() ) {
|
if (output_ && parallelOutput_->isIORank() ) {
|
||||||
// Ensure that output dir exists
|
// Ensure that output dir exists
|
||||||
|
142
opm/autodiff/ThreadHandle.hpp
Normal file
142
opm/autodiff/ThreadHandle.hpp
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
#ifndef OPM_THREADHANDLE_HPP
|
||||||
|
#define OPM_THREADHANDLE_HPP
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <dune/common/exceptions.hh>
|
||||||
|
|
||||||
|
#include <thread>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
namespace Opm
|
||||||
|
{
|
||||||
|
|
||||||
|
class ThreadHandle
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
class ObjectIF
|
||||||
|
{
|
||||||
|
protected:
|
||||||
|
ObjectIF() {}
|
||||||
|
public:
|
||||||
|
virtual ~ObjectIF() {}
|
||||||
|
virtual void run() = 0;
|
||||||
|
virtual bool isEndMarker () const { return false; }
|
||||||
|
};
|
||||||
|
|
||||||
|
protected:
|
||||||
|
class EndObject : public ObjectIF
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
void run () { }
|
||||||
|
bool isEndMarker () const { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
|
////////////////////////////////////////////
|
||||||
|
// class ThreadHandleObject
|
||||||
|
////////////////////////////////////////////
|
||||||
|
class ThreadHandleObject
|
||||||
|
{
|
||||||
|
std::queue< ObjectIF* > objPtr_;
|
||||||
|
std::mutex mutex_;
|
||||||
|
|
||||||
|
// no copying
|
||||||
|
ThreadHandleObject( const ThreadHandleObject& );
|
||||||
|
|
||||||
|
public:
|
||||||
|
// constructor creating thread with given thread number
|
||||||
|
ThreadHandleObject()
|
||||||
|
: objPtr_(), mutex_()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
//! insert object into queue
|
||||||
|
void push_back( ObjectIF* obj )
|
||||||
|
{
|
||||||
|
// lock mutex to make sure objPtr is not used
|
||||||
|
mutex_.lock();
|
||||||
|
objPtr_.emplace( obj );
|
||||||
|
mutex_.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
//! return 1 of thread is stoped, 0 otherwise
|
||||||
|
int stoped() const
|
||||||
|
{
|
||||||
|
return ( objPtr_.empty() ) ? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do the work
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
while( objPtr_.empty() )
|
||||||
|
{
|
||||||
|
sleep( 1 );
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// lock mutex for access to objPtr_
|
||||||
|
mutex_.lock();
|
||||||
|
|
||||||
|
// get next object from queue
|
||||||
|
std::unique_ptr< ObjectIF > obj( objPtr_.front() );
|
||||||
|
objPtr_.pop();
|
||||||
|
|
||||||
|
// unlock mutex for access to objPtr_
|
||||||
|
mutex_.unlock();
|
||||||
|
|
||||||
|
// if object is end marker terminate thread
|
||||||
|
if( obj->isEndMarker() ){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute object action
|
||||||
|
obj->run();
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep thread running
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
}; // end ThreadHandleObject
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////
|
||||||
|
// end ThreadHandleObject
|
||||||
|
////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
static void startThread( ThreadHandleObject* obj )
|
||||||
|
{
|
||||||
|
obj->run();
|
||||||
|
}
|
||||||
|
|
||||||
|
ThreadHandleObject threadObject_;
|
||||||
|
std::thread thread_;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// prohibit copying
|
||||||
|
ThreadHandle( const ThreadHandle& );
|
||||||
|
|
||||||
|
public:
|
||||||
|
// default constructor
|
||||||
|
ThreadHandle()
|
||||||
|
: threadObject_(),
|
||||||
|
thread_( startThread, &threadObject_ )
|
||||||
|
{
|
||||||
|
// detach thread into nirvana
|
||||||
|
thread_.detach();
|
||||||
|
} // end constructor
|
||||||
|
|
||||||
|
//! dispatch object to separate thread
|
||||||
|
void dispatch( ObjectIF* obj )
|
||||||
|
{
|
||||||
|
// add object to queue of objects
|
||||||
|
threadObject_.push_back( obj ) ;
|
||||||
|
}
|
||||||
|
|
||||||
|
~ThreadHandle()
|
||||||
|
{
|
||||||
|
// dispatch end object which will terminate the thread
|
||||||
|
threadObject_.push_back( new EndObject() ) ;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // end namespace Opm
|
||||||
|
#endif
|
Loading…
Reference in New Issue
Block a user