mirror of
https://github.com/OPM/opm-simulators.git
synced 2024-12-25 08:41:00 -06:00
Some more cleaning in the output code in opm-output, ewoms and opm-
simulator 1) Don't depend on legacy code for communicating the data::wells 2) Bugfix. Store globalIdx instead localIdx in data::wells::complitions 3) Move ThreadHandle to ebos
This commit is contained in:
parent
024344600e
commit
a89a6af854
@ -330,7 +330,6 @@ list (APPEND PUBLIC_HEADER_FILES
|
|||||||
opm/autodiff/BlackoilWellModel.hpp
|
opm/autodiff/BlackoilWellModel.hpp
|
||||||
opm/autodiff/BlackoilWellModel_impl.hpp
|
opm/autodiff/BlackoilWellModel_impl.hpp
|
||||||
opm/autodiff/MissingFeatures.hpp
|
opm/autodiff/MissingFeatures.hpp
|
||||||
opm/autodiff/ThreadHandle.hpp
|
|
||||||
opm/core/flowdiagnostics/AnisotropicEikonal.hpp
|
opm/core/flowdiagnostics/AnisotropicEikonal.hpp
|
||||||
opm/core/flowdiagnostics/DGBasis.hpp
|
opm/core/flowdiagnostics/DGBasis.hpp
|
||||||
opm/core/flowdiagnostics/FlowDiagnostics.hpp
|
opm/core/flowdiagnostics/FlowDiagnostics.hpp
|
||||||
|
@ -32,16 +32,13 @@
|
|||||||
#include <opm/common/utility/parameters/ParameterGroup.hpp>
|
#include <opm/common/utility/parameters/ParameterGroup.hpp>
|
||||||
#include <opm/core/wells/DynamicListEconLimited.hpp>
|
#include <opm/core/wells/DynamicListEconLimited.hpp>
|
||||||
#include <opm/core/simulator/SimulatorReport.hpp>
|
#include <opm/core/simulator/SimulatorReport.hpp>
|
||||||
|
#include <opm/core/wells/WellsManager.hpp>
|
||||||
#include <opm/output/data/Cells.hpp>
|
#include <opm/output/data/Cells.hpp>
|
||||||
#include <opm/output/data/Solution.hpp>
|
#include <opm/output/data/Solution.hpp>
|
||||||
|
|
||||||
#include <opm/autodiff/GridHelpers.hpp>
|
|
||||||
#include <opm/autodiff/ParallelDebugOutput.hpp>
|
|
||||||
#include <opm/autodiff/Compat.hpp>
|
#include <opm/autodiff/Compat.hpp>
|
||||||
|
|
||||||
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
||||||
#include <opm/autodiff/ThreadHandle.hpp>
|
|
||||||
|
|
||||||
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
||||||
#include <opm/parser/eclipse/EclipseState/SummaryConfig/SummaryConfig.hpp>
|
#include <opm/parser/eclipse/EclipseState/SummaryConfig/SummaryConfig.hpp>
|
||||||
@ -90,36 +87,8 @@ namespace Opm
|
|||||||
}()
|
}()
|
||||||
),
|
),
|
||||||
ebosSimulator_(ebosSimulator),
|
ebosSimulator_(ebosSimulator),
|
||||||
phaseUsage_(phaseUsageFromDeck(eclState())),
|
phaseUsage_(phaseUsageFromDeck(eclState()))
|
||||||
parallelOutput_( output_ ? new ParallelDebugOutput< Grid >( grid(), eclState(), schedule(), phaseUsage_.num_phases, phaseUsage_ ) : 0 ),
|
{}
|
||||||
restart_double_si_( output_ ? param.getDefault("restart_double_si", false) : false ),
|
|
||||||
asyncOutput_()
|
|
||||||
{
|
|
||||||
// For output.
|
|
||||||
if ( output_ )
|
|
||||||
{
|
|
||||||
|
|
||||||
// create output thread if enabled and rank is I/O rank
|
|
||||||
// async output is enabled by default if pthread are enabled
|
|
||||||
#if HAVE_PTHREAD
|
|
||||||
const bool asyncOutputDefault = true;
|
|
||||||
#else
|
|
||||||
const bool asyncOutputDefault = false;
|
|
||||||
#endif
|
|
||||||
if( param.getDefault("async_output", asyncOutputDefault ) )
|
|
||||||
{
|
|
||||||
const bool isIORank = parallelOutput_ ? parallelOutput_->isIORank() : true;
|
|
||||||
#if HAVE_PTHREAD
|
|
||||||
asyncOutput_.reset( new ThreadHandle( isIORank ) );
|
|
||||||
#else
|
|
||||||
OPM_THROW(std::runtime_error,"Pthreads were not found, cannot enable async_output");
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief Write a blackoil reservoir state to disk for later inspection with
|
* \brief Write a blackoil reservoir state to disk for later inspection with
|
||||||
@ -129,7 +98,7 @@ namespace Opm
|
|||||||
*/
|
*/
|
||||||
template<class SimulationDataContainer, class Model>
|
template<class SimulationDataContainer, class Model>
|
||||||
void writeTimeStep(const SimulatorTimerInterface& timer,
|
void writeTimeStep(const SimulatorTimerInterface& timer,
|
||||||
const SimulationDataContainer& reservoirStateDummy,
|
const SimulationDataContainer& /*reservoirStateDummy*/,
|
||||||
const Opm::WellStateFullyImplicitBlackoil& /*wellStateDummy*/,
|
const Opm::WellStateFullyImplicitBlackoil& /*wellStateDummy*/,
|
||||||
const Model& physicalModel,
|
const Model& physicalModel,
|
||||||
const bool substep = false,
|
const bool substep = false,
|
||||||
@ -143,28 +112,9 @@ namespace Opm
|
|||||||
|
|
||||||
const Opm::WellStateFullyImplicitBlackoil& localWellState = physicalModel.wellModel().wellState();
|
const Opm::WellStateFullyImplicitBlackoil& localWellState = physicalModel.wellModel().wellState();
|
||||||
|
|
||||||
if( parallelOutput_ && parallelOutput_->isParallel() )
|
// The writeOutput expects a local data::solution vector and a local data::well vector.
|
||||||
{
|
auto localWellData = localWellState.report(phaseUsage_, Opm::UgGridHelpers::globalCell(grid()) );
|
||||||
// If this is not the initial write and no substep, then the well
|
ebosSimulator_.problem().writeOutput(localWellData, timer.simulationTimeElapsed(), substep, totalSolverTime, nextstep);
|
||||||
// state used in the computation is actually the one of the last
|
|
||||||
// step. We need that well state for the gathering. Otherwise
|
|
||||||
// It an exception with a message like "global state does not
|
|
||||||
// contain well ..." might be thrown.
|
|
||||||
// The distribution of data::solution is not done here
|
|
||||||
data::Solution localCellDataDummy{};
|
|
||||||
int wellStateStepNumber = ( ! substep && timer.reportStepNum() > 0) ?
|
|
||||||
(timer.reportStepNum() - 1) : timer.reportStepNum();
|
|
||||||
// collect all solutions to I/O rank
|
|
||||||
parallelOutput_->collectToIORank( reservoirStateDummy, localWellState,
|
|
||||||
localCellDataDummy,
|
|
||||||
wellStateStepNumber );
|
|
||||||
// Note that at this point the extraData are assumed to be global, i.e. identical across all processes.
|
|
||||||
}
|
|
||||||
|
|
||||||
const WellStateFullyImplicitBlackoil& wellState = (parallelOutput_ && parallelOutput_->isParallel() ) ? parallelOutput_->globalWellState() : localWellState;
|
|
||||||
|
|
||||||
// The writeOutput expects a local data::solution vector and a global data::well vector.
|
|
||||||
ebosSimulator_.problem().writeOutput( wellState.report(phaseUsage_), timer.simulationTimeElapsed(), substep, totalSolverTime, nextstep);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,9 +185,6 @@ namespace Opm
|
|||||||
const bool output_;
|
const bool output_;
|
||||||
Simulator& ebosSimulator_;
|
Simulator& ebosSimulator_;
|
||||||
Opm::PhaseUsage phaseUsage_;
|
Opm::PhaseUsage phaseUsage_;
|
||||||
std::unique_ptr< ParallelDebugOutputInterface > parallelOutput_;
|
|
||||||
const bool restart_double_si_;
|
|
||||||
std::unique_ptr< ThreadHandle > asyncOutput_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -339,7 +339,7 @@ namespace Opm
|
|||||||
substep,
|
substep,
|
||||||
timer.simulationTimeElapsed(),
|
timer.simulationTimeElapsed(),
|
||||||
simProps,
|
simProps,
|
||||||
wellState.report(phaseUsage_),
|
wellState.report(phaseUsage_, globalCellIdxMap_),
|
||||||
miscSummaryData,
|
miscSummaryData,
|
||||||
{}, //regionData
|
{}, //regionData
|
||||||
{}, //blockData
|
{}, //blockData
|
||||||
|
@ -40,7 +40,7 @@
|
|||||||
#include <opm/autodiff/ParallelDebugOutput.hpp>
|
#include <opm/autodiff/ParallelDebugOutput.hpp>
|
||||||
|
|
||||||
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
#include <opm/autodiff/WellStateFullyImplicitBlackoil.hpp>
|
||||||
#include <opm/autodiff/ThreadHandle.hpp>
|
#include <ebos/threadhandle.hh>
|
||||||
#include <opm/autodiff/AutoDiffBlock.hpp>
|
#include <opm/autodiff/AutoDiffBlock.hpp>
|
||||||
|
|
||||||
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
#include <opm/parser/eclipse/EclipseState/EclipseState.hpp>
|
||||||
@ -317,6 +317,7 @@ namespace Opm
|
|||||||
const SummaryConfig& summaryConfig_;
|
const SummaryConfig& summaryConfig_;
|
||||||
|
|
||||||
std::unique_ptr< ThreadHandle > asyncOutput_;
|
std::unique_ptr< ThreadHandle > asyncOutput_;
|
||||||
|
const int* globalCellIdxMap_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -348,7 +349,8 @@ namespace Opm
|
|||||||
eclipseState_(eclipseState),
|
eclipseState_(eclipseState),
|
||||||
schedule_(schedule),
|
schedule_(schedule),
|
||||||
summaryConfig_(summaryConfig),
|
summaryConfig_(summaryConfig),
|
||||||
asyncOutput_()
|
asyncOutput_(),
|
||||||
|
globalCellIdxMap_(Opm::UgGridHelpers::globalCell(grid))
|
||||||
{
|
{
|
||||||
// For output.
|
// For output.
|
||||||
if ( output_ )
|
if ( output_ )
|
||||||
|
@ -1,188 +0,0 @@
|
|||||||
#ifndef OPM_THREADHANDLE_HPP
|
|
||||||
#define OPM_THREADHANDLE_HPP
|
|
||||||
|
|
||||||
#include <cassert>
|
|
||||||
#include <dune/common/exceptions.hh>
|
|
||||||
|
|
||||||
#include <thread>
|
|
||||||
#include <mutex>
|
|
||||||
#include <queue>
|
|
||||||
|
|
||||||
namespace Opm
|
|
||||||
{
|
|
||||||
|
|
||||||
class ThreadHandle
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
class ObjectInterface
|
|
||||||
{
|
|
||||||
protected:
|
|
||||||
ObjectInterface() {}
|
|
||||||
public:
|
|
||||||
virtual ~ObjectInterface() {}
|
|
||||||
virtual void run() = 0;
|
|
||||||
virtual bool isEndMarker () const { return false; }
|
|
||||||
};
|
|
||||||
|
|
||||||
template <class Object>
|
|
||||||
class ObjectWrapper : public ObjectInterface
|
|
||||||
{
|
|
||||||
Object obj_;
|
|
||||||
public:
|
|
||||||
ObjectWrapper( Object&& obj ) : obj_( std::move( obj ) ) {}
|
|
||||||
void run() { obj_.run(); }
|
|
||||||
};
|
|
||||||
|
|
||||||
protected:
|
|
||||||
class EndObject : public ObjectInterface
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
void run () { }
|
|
||||||
bool isEndMarker () const { return true; }
|
|
||||||
};
|
|
||||||
|
|
||||||
////////////////////////////////////////////
|
|
||||||
// class ThreadHandleQueue
|
|
||||||
////////////////////////////////////////////
|
|
||||||
class ThreadHandleQueue
|
|
||||||
{
|
|
||||||
protected:
|
|
||||||
std::queue< std::unique_ptr< ObjectInterface > > objQueue_;
|
|
||||||
std::mutex mutex_;
|
|
||||||
|
|
||||||
// no copying
|
|
||||||
ThreadHandleQueue( const ThreadHandleQueue& ) = delete;
|
|
||||||
|
|
||||||
// wait duration of 10 milli seconds
|
|
||||||
void wait() const
|
|
||||||
{
|
|
||||||
std::this_thread::sleep_for( std::chrono::milliseconds(10) );
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
//! constructor creating object that is executed by thread
|
|
||||||
ThreadHandleQueue()
|
|
||||||
: objQueue_(), mutex_()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
~ThreadHandleQueue()
|
|
||||||
{
|
|
||||||
// wait until all objects have been written.
|
|
||||||
while( ! objQueue_.empty() )
|
|
||||||
{
|
|
||||||
wait();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//! insert object into threads queue
|
|
||||||
void push_back( std::unique_ptr< ObjectInterface >&& obj )
|
|
||||||
{
|
|
||||||
// lock mutex to make sure objPtr is not used
|
|
||||||
mutex_.lock();
|
|
||||||
objQueue_.emplace( std::move(obj) );
|
|
||||||
mutex_.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
//! do the work until the queue received an end object
|
|
||||||
void run()
|
|
||||||
{
|
|
||||||
// wait until objects have been pushed to the queue
|
|
||||||
while( objQueue_.empty() )
|
|
||||||
{
|
|
||||||
// sleep one second
|
|
||||||
wait();
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// lock mutex for access to objQueue_
|
|
||||||
mutex_.lock();
|
|
||||||
|
|
||||||
// get next object from queue
|
|
||||||
std::unique_ptr< ObjectInterface > obj( objQueue_.front().release() );
|
|
||||||
// remove object from queue
|
|
||||||
objQueue_.pop();
|
|
||||||
|
|
||||||
// unlock mutex for access to objQueue_
|
|
||||||
mutex_.unlock();
|
|
||||||
|
|
||||||
// if object is end marker terminate thread
|
|
||||||
if( obj->isEndMarker() ){
|
|
||||||
if( ! objQueue_.empty() ) {
|
|
||||||
OPM_THROW(std::logic_error,"ThreadHandleQueue: not all queued objects were executed");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// execute object action
|
|
||||||
obj->run();
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep thread running
|
|
||||||
run();
|
|
||||||
}
|
|
||||||
}; // end ThreadHandleQueue
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////
|
|
||||||
// end ThreadHandleQueue
|
|
||||||
////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
// start the thread by calling method run
|
|
||||||
static void startThread( ThreadHandleQueue* obj )
|
|
||||||
{
|
|
||||||
obj->run();
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadHandleQueue threadObjectQueue_;
|
|
||||||
std::unique_ptr< std::thread > thread_;
|
|
||||||
|
|
||||||
private:
|
|
||||||
// prohibit copying
|
|
||||||
ThreadHandle( const ThreadHandle& ) = delete;
|
|
||||||
|
|
||||||
public:
|
|
||||||
//! constructor creating ThreadHandle
|
|
||||||
//! \param isIORank if true thread is created
|
|
||||||
ThreadHandle( const bool createThread )
|
|
||||||
: threadObjectQueue_(),
|
|
||||||
thread_()
|
|
||||||
{
|
|
||||||
if( createThread )
|
|
||||||
{
|
|
||||||
thread_.reset( new std::thread( startThread, &threadObjectQueue_ ) );
|
|
||||||
// detach thread into nirvana
|
|
||||||
thread_->detach();
|
|
||||||
}
|
|
||||||
} // end constructor
|
|
||||||
|
|
||||||
//! dispatch object to queue of separate thread
|
|
||||||
template <class Object>
|
|
||||||
void dispatch( Object&& obj )
|
|
||||||
{
|
|
||||||
if( thread_ )
|
|
||||||
{
|
|
||||||
typedef ObjectWrapper< Object > ObjectPointer;
|
|
||||||
ObjectInterface* objPtr = new ObjectPointer( std::move(obj) );
|
|
||||||
|
|
||||||
// add object to queue of objects
|
|
||||||
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (objPtr) );
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
OPM_THROW(std::logic_error,"ThreadHandle::dispatch called without thread being initialized (i.e. on non-ioRank)");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//! destructor terminating the thread
|
|
||||||
~ThreadHandle()
|
|
||||||
{
|
|
||||||
if( thread_ )
|
|
||||||
{
|
|
||||||
// dispatch end object which will terminate the thread
|
|
||||||
threadObjectQueue_.push_back( std::unique_ptr< ObjectInterface > (new EndObject()) ) ;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
} // end namespace Opm
|
|
||||||
#endif
|
|
@ -238,8 +238,8 @@ namespace Opm
|
|||||||
std::vector<int>& currentControls() { return current_controls_; }
|
std::vector<int>& currentControls() { return current_controls_; }
|
||||||
const std::vector<int>& currentControls() const { return current_controls_; }
|
const std::vector<int>& currentControls() const { return current_controls_; }
|
||||||
|
|
||||||
data::Wells report(const PhaseUsage &pu) const override {
|
data::Wells report(const PhaseUsage &pu, const int* globalCellIdxMap) const override {
|
||||||
data::Wells res = WellState::report(pu);
|
data::Wells res = WellState::report(pu, globalCellIdxMap);
|
||||||
|
|
||||||
const int nw = this->numWells();
|
const int nw = this->numWells();
|
||||||
if( nw == 0 ) return res;
|
if( nw == 0 ) return res;
|
||||||
|
@ -224,7 +224,7 @@ namespace Opm
|
|||||||
return wellRates().size() / numWells();
|
return wellRates().size() / numWells();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual data::Wells report(const PhaseUsage& pu) const
|
virtual data::Wells report(const PhaseUsage& pu, const int* globalCellIdxMap) const
|
||||||
{
|
{
|
||||||
using rt = data::Rates::opt;
|
using rt = data::Rates::opt;
|
||||||
|
|
||||||
@ -260,7 +260,7 @@ namespace Opm
|
|||||||
const auto active_index = this->wells_->well_cells[ wi ];
|
const auto active_index = this->wells_->well_cells[ wi ];
|
||||||
|
|
||||||
auto& completion = well.completions[ i ];
|
auto& completion = well.completions[ i ];
|
||||||
completion.index = active_index;
|
completion.index = globalCellIdxMap[active_index];
|
||||||
completion.pressure = this->perfPress()[ itr.second[1] + i ];
|
completion.pressure = this->perfPress()[ itr.second[1] + i ];
|
||||||
completion.reservoir_rate = this->perfRates()[ itr.second[1] + i ];
|
completion.reservoir_rate = this->perfRates()[ itr.second[1] + i ];
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user