Merge pull request #4323 from bska/output-convergence-history

Output Non-Linear Convergence to Separate File if Requested
This commit is contained in:
Atgeirr Flø Rasmussen 2022-12-20 11:17:30 +01:00 committed by GitHub
commit 012c69f75b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 627 additions and 9 deletions

View File

@ -42,6 +42,7 @@ list (APPEND MAIN_SOURCE_FILES
opm/simulators/timestepping/SimulatorReport.cpp
opm/simulators/flow/countGlobalCells.cpp
opm/simulators/flow/ConvergenceOutputConfiguration.cpp
opm/simulators/flow/ExtraConvergenceOutputThread.cpp
opm/simulators/flow/KeywordValidation.cpp
opm/simulators/flow/SimulatorFullyImplicitBlackoilEbos.cpp
opm/simulators/flow/ValidationFunctions.cpp
@ -276,6 +277,7 @@ list (APPEND PUBLIC_HEADER_FILES
opm/simulators/flow/BlackoilModelEbos.hpp
opm/simulators/flow/BlackoilModelParametersEbos.hpp
opm/simulators/flow/ConvergenceOutputConfiguration.hpp
opm/simulators/flow/ExtraConvergenceOutputThread.hpp
opm/simulators/flow/FlowMainEbos.hpp
opm/simulators/flow/Main.hpp
opm/simulators/flow/NonlinearSolverEbos.hpp

View File

@ -1062,6 +1062,11 @@ namespace Opm {
return convergence_reports_;
}
std::vector<StepReport> getStepReportsDestructively() const
{
return std::move(this->convergence_reports_);
}
protected:
// --------- Data members ---------

View File

@ -0,0 +1,324 @@
/*
Copyright 2022 Equinor ASA.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include <opm/simulators/flow/ExtraConvergenceOutputThread.hpp>
#include <opm/simulators/flow/ConvergenceOutputConfiguration.hpp>
#include <opm/simulators/timestepping/ConvergenceReport.hpp>
#include <algorithm>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <filesystem>
#include <fstream>
#include <functional>
#include <iomanip>
#include <iterator>
#include <mutex>
#include <numeric>
#include <optional>
#include <ostream>
#include <sstream>
#include <stdexcept>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>
namespace {
std::string to_string(const Opm::ConvergenceReport::ReservoirFailure::Type t)
{
using Type = Opm::ConvergenceReport::ReservoirFailure::Type;
const auto type_strings = std::unordered_map<Type, std::string> {
{ Type::Invalid , "Invalid" },
{ Type::MassBalance, "MB" },
{ Type::Cnv , "CNV" },
};
auto strPos = type_strings.find(t);
assert ((strPos != type_strings.end()) &&
"Unsupported convergence metric type");
return strPos->second;
}
std::string
formatMetricColumn(const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName,
const Opm::ConvergenceReport::ReservoirConvergenceMetric& metric)
{
std::ostringstream os;
os << to_string(metric.type()) << '_' << getPhaseName(metric.phase());
return os.str();
}
std::string::size_type
maxColHeaderSize(const std::string::size_type minColSize,
const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName,
const std::vector<Opm::ConvergenceReport::ReservoirConvergenceMetric>& cols)
{
return std::accumulate(cols.begin(), cols.end(), minColSize,
[&getPhaseName](const std::string::size_type maxChar,
const Opm::ConvergenceReport::ReservoirConvergenceMetric& metric)
{
return std::max(maxChar, formatMetricColumn(getPhaseName, metric).size());
});
}
std::string::size_type
writeConvergenceHeader(std::ostream& os,
const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName,
const Opm::ConvergenceReportQueue::OutputRequest& firstRequest)
{
const auto minColSize = std::string::size_type{11};
os << std::right << std::setw(minColSize) << "ReportStep" << ' '
<< std::right << std::setw(minColSize) << "TimeStep" << ' '
<< std::right << std::setw(minColSize) << "Time" << ' '
<< std::right << std::setw(minColSize) << "Iteration";
const auto& metrics = firstRequest.reports.front().reservoirConvergence();
const auto maxChar = maxColHeaderSize(minColSize, getPhaseName, metrics);
for (const auto& metric : metrics) {
os << std::right << std::setw(maxChar + 1)
<< formatMetricColumn(getPhaseName, metric);
}
// Note: Newline character intentionally placed in separate output
// request to not influence right-justification of column header.
os << std::right << std::setw(maxChar + 1) << "WellStatus" << '\n';
return maxChar;
}
void writeConvergenceRequest(std::ostream& os,
const Opm::ConvergenceOutputThread::ConvertToTimeUnits& convertTime,
std::string::size_type colSize,
const Opm::ConvergenceReportQueue::OutputRequest& request)
{
const auto firstColSize = std::string::size_type{11};
os.setf(std::ios_base::scientific);
auto iter = 0;
for (const auto& report : request.reports) {
os << std::setw(firstColSize) << request.reportStep << ' '
<< std::setw(firstColSize) << request.currentStep << ' '
<< std::setprecision(4) << std::setw(firstColSize)
<< convertTime(report.reportTime()) << ' '
<< std::setw(firstColSize) << iter;
for (const auto& metric : report.reservoirConvergence()) {
os << std::setprecision(4) << std::setw(colSize + 1) << metric.value();
}
os << std::right << std::setw(colSize + 1)
<< (report.wellFailed() ? "FAIL" : "CONV") << '\n';
++iter;
}
}
} // Anonymous namespace
// ---------------------------------------------------------------------------
class Opm::ConvergenceOutputThread::Impl
{
public:
explicit Impl(std::string_view outputDir,
std::string_view baseName,
ComponentToPhaseName getPhaseName,
ConvertToTimeUnits convertTime,
ConvergenceOutputConfiguration config,
ConvergenceReportQueue& queue);
ConvergenceReportQueue& queue()
{
return this->queue_;
}
void write(const std::vector<ConvergenceReportQueue::OutputRequest>& requests);
bool finalRequestWritten() const
{
return this->finalRequestWritten_;
}
private:
std::reference_wrapper<ConvergenceReportQueue> queue_;
ComponentToPhaseName getPhaseName_{};
ConvertToTimeUnits convertTime_{};
std::optional<std::ofstream> infoIter_{};
std::string::size_type colSize_{0};
bool haveOutputIterHeader_{false};
bool finalRequestWritten_{false};
void writeIterInfo(const std::vector<ConvergenceReportQueue::OutputRequest>& requests);
};
Opm::ConvergenceOutputThread::Impl::Impl(std::string_view outputDir,
std::string_view baseName,
ComponentToPhaseName getPhaseName,
ConvertToTimeUnits convertTime,
ConvergenceOutputConfiguration config,
ConvergenceReportQueue& queue)
: queue_ { std::ref(queue) }
, getPhaseName_ { std::move(getPhaseName) }
, convertTime_ { std::move(convertTime) }
{
if (config.want(ConvergenceOutputConfiguration::Option::Iterations)) {
this->infoIter_.emplace
(std::filesystem::path { outputDir } /
std::filesystem::path { baseName }.concat(".INFOITER"));
}
}
void
Opm::ConvergenceOutputThread::Impl::
write(const std::vector<ConvergenceReportQueue::OutputRequest>& requests)
{
assert (! requests.empty() &&
"Internal logic error in forming convergence output request");
this->writeIterInfo(requests);
}
void
Opm::ConvergenceOutputThread::Impl::
writeIterInfo(const std::vector<ConvergenceReportQueue::OutputRequest>& requests)
{
if (! this->infoIter_.has_value()) {
return;
}
if (! this->haveOutputIterHeader_) {
this->colSize_ =
writeConvergenceHeader(this->infoIter_.value(),
this->getPhaseName_,
requests.front());
this->haveOutputIterHeader_ = true;
}
for (const auto& request : requests) {
writeConvergenceRequest(this->infoIter_.value(),
this->convertTime_,
this->colSize_,
request);
if (request.reports.empty()) {
this->finalRequestWritten_ = true;
break;
}
}
this->infoIter_.value().flush();
}
// ===========================================================================
// Public Interface Below Separator
// ===========================================================================
void Opm::ConvergenceReportQueue::enqueue(std::vector<OutputRequest>&& requests)
{
// Signal output thread if we're going from "no work" to "some work".
// We don't need to signal if we're going from "some work" to "more
// work".
auto must_notify = false;
{
std::lock_guard<std::mutex> guard{ this->mtx_ };
must_notify = this->requests_.empty();
this->requests_.insert(this->requests_.end(),
std::make_move_iterator(requests.begin()),
std::make_move_iterator(requests.end()));
}
if (must_notify) {
this->cv_.notify_one();
}
}
void Opm::ConvergenceReportQueue::signalLastOutputRequest()
{
// Empty request signals end of production.
this->enqueue(std::vector<OutputRequest>(1));
}
// ---------------------------------------------------------------------------
Opm::ConvergenceOutputThread::
ConvergenceOutputThread(std::string_view outputDir,
std::string_view baseName,
ComponentToPhaseName getPhaseName,
ConvertToTimeUnits convertTime,
ConvergenceOutputConfiguration config,
ConvergenceReportQueue& queue)
: pImpl_ { std::make_unique<Impl>(outputDir,
baseName,
getPhaseName,
convertTime,
config,
queue) }
{}
Opm::ConvergenceOutputThread::~ConvergenceOutputThread() = default;
Opm::ConvergenceOutputThread::ConvergenceOutputThread(ConvergenceOutputThread&& src)
: pImpl_ { std::move(src.pImpl_) }
{}
void
Opm::ConvergenceOutputThread::
writeSynchronous(std::vector<ConvergenceReportQueue::OutputRequest>&& requests)
{
this->pImpl_->write(requests);
}
void Opm::ConvergenceOutputThread::writeASynchronous()
{
// This is the main function of the convergence output thread. It runs
// for the duration of the process, although mostly in an idle/waiting
// state. Implementation from Microsoft's "GoingNative" show, episode
// 53, on threading and parallelism in the STL.
auto& queue = this->pImpl_->queue();
// Note: Loop terminates only when final request written.
for (auto localReq = std::vector<ConvergenceReportQueue::OutputRequest>{} ; ; localReq.clear()) {
std::unique_lock<std::mutex> lock { queue.mtx_ };
queue.cv_.wait(lock, [&queue]() { return !queue.requests_.empty(); });
// Capture all pending output requests, relinquish lock and write
// file output outside of the critical section.
queue.requests_.swap(localReq);
lock.unlock();
this->pImpl_->write(localReq);
if (this->pImpl_->finalRequestWritten()) {
// No more output coming. Shut down thread.
return;
}
}
}

View File

@ -0,0 +1,191 @@
/*
Copyright 2022 Equinor ASA.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef EXTRA_CONVERGENCE_OUTPUT_THREAD_HPP
#define EXTRA_CONVERGENCE_OUTPUT_THREAD_HPP
#include <opm/simulators/timestepping/ConvergenceReport.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <string_view>
#include <vector>
namespace Opm {
class ConvergenceOutputConfiguration;
} // namespace Opm
/// \file System for outputting additional convergence information, such as
/// material balance and CNV values, at each non-linear iteration.
///
/// Supports an asynchronous protocol that assumes there is a single thread
/// dedicated to per-iteration file output. Synchronous file output is
/// available for debugging and development purposes.
namespace Opm
{
/// Forward declaration so that Queue may declare this type its 'friend'.
class ConvergenceOutputThread;
/// Communication channel between thread creating output requests and
/// consumer thread writing those requests to a file.
///
/// Output thread has access to internal state. Producer thread uses public
/// interface. Producer thread creates an object of this type and launches
/// the output thread with a reference to that queue object.
class ConvergenceReportQueue
{
public:
/// Single output request.
///
/// Associates non-linear iteration convergence information to single
/// report and timestep.
struct OutputRequest
{
/// Current report step
int reportStep{-1};
/// Current timestep within \c reportStep. Expected to be a small
/// integer.
int currentStep{-1};
/// Convergence metrics for each non-linear ieration in the \c
/// currentStep.
std::vector<ConvergenceReport> reports{};
};
/// Push sequence of output requests, typically all substeps whether
/// converged or not, of a single report step.
///
/// \param[in] requests Output request sequence. Queue takes ownership.
void enqueue(std::vector<OutputRequest>&& requests);
/// Signal end of output request stream.
///
/// No additional requests should be added to queue following a call to
/// this member function. Output thread detects this signal, completes
/// any pending output requests, and shuts down afterwards.
void signalLastOutputRequest();
friend class ConvergenceOutputThread;
private:
/// Mutex for critical sections protecting 'requests_'.
std::mutex mtx_{};
/// Condition variable for threads waiting on changes to 'requests_'.
std::condition_variable cv_{};
/// Pending convergence output requests.
std::vector<OutputRequest> requests_{};
};
/// Encapsulating object for thread processing producer's convergence output
/// requests.
class ConvergenceOutputThread
{
public:
/// Protocol for converting a phase/component ID into a human readable
/// phase/component name.
using ComponentToPhaseName = std::function<std::string_view(int)>;
/// Protocol for converting an SI elapsed time value into an equivalent
/// time value in the run's output conventions.
///
/// Will typically use \code UnitSystem::from_si() \endcode.
using ConvertToTimeUnits = std::function<double(double)>;
/// Constructor.
///
/// \param[in] outputDir -- Name of run's output directory. Any file
/// output will be written to this directory.
///
/// \param[in] baseName -- Run's base name. Output files will have this
/// name and a type-specific file extension.
///
/// \param[in] getPhaseName -- Callable object for converting component
/// indices into human readable component names.
///
/// \param[in] convertTime -- Callable object for converting SI elapsed
/// time values into equivalent elapsed time
/// values using run's time conventions.
///
/// \param[in] config -- Convergence output configuration options.
/// Determines whether to output additional
/// convergence information and, if so, what
/// information to output.
///
/// \param[in] queue -- Communication channel between producer thread
/// and this output thread. User must form a
/// valid queue prior to creating the output
/// thread object.
explicit ConvergenceOutputThread(std::string_view outputDir,
std::string_view baseName,
ComponentToPhaseName getPhaseName,
ConvertToTimeUnits convertTime,
ConvergenceOutputConfiguration config,
ConvergenceReportQueue& queue);
/// Deleted copy constructor.
ConvergenceOutputThread(const ConvergenceOutputThread& src) = delete;
/// Move constructor.
ConvergenceOutputThread(ConvergenceOutputThread&& src);
/// Deleted assignment operator.
ConvergenceOutputThread& operator=(const ConvergenceOutputThread& src) = delete;
/// Deleted move-assignment operator.
ConvergenceOutputThread& operator=(ConvergenceOutputThread&& src) = delete;
/// Destructor.
///
/// Needed for pimpl idiom.
~ConvergenceOutputThread();
/// Perform synchronous file output of a sequence of requests.
///
/// Mostly for development and debugging purposes.
///
/// \param[in] requests Output request sequence. Thread takes ownership.
void writeSynchronous(std::vector<ConvergenceReportQueue::OutputRequest>&& requests);
/// Output thread worker function
///
/// This is the endpoint that users should associate to a \code
/// std::thread \endcode object.
///
/// Returns once last pending output request is written (cf. \code
/// ConvergenceReportQueue::signalLastOutputRequest() \endcode.)
void writeASynchronous();
private:
/// Private implementation class.
class Impl;
/// Pointer to implementation.
std::unique_ptr<Impl> pImpl_;
};
} // namespace Opm
#endif // EXTRA_CONVERGENCE_OUTPUT_THREAD_HPP

View File

@ -22,17 +22,30 @@
#ifndef OPM_SIMULATORFULLYIMPLICITBLACKOILEBOS_HEADER_INCLUDED
#define OPM_SIMULATORFULLYIMPLICITBLACKOILEBOS_HEADER_INCLUDED
#include <opm/simulators/flow/NonlinearSolverEbos.hpp>
#include <opm/simulators/flow/BlackoilModelEbos.hpp>
#include <opm/simulators/flow/BlackoilModelParametersEbos.hpp>
#include <opm/simulators/wells/WellState.hpp>
#include <opm/simulators/flow/ConvergenceOutputConfiguration.hpp>
#include <opm/simulators/flow/ExtraConvergenceOutputThread.hpp>
#include <opm/simulators/flow/NonlinearSolverEbos.hpp>
#include <opm/simulators/aquifers/BlackoilAquiferModel.hpp>
#include <opm/simulators/utils/moduleVersion.hpp>
#include <opm/simulators/timestepping/AdaptiveTimeSteppingEbos.hpp>
#include <opm/simulators/utils/moduleVersion.hpp>
#include <opm/simulators/wells/WellState.hpp>
#include <opm/grid/utility/StopWatch.hpp>
#include <opm/input/eclipse/Units/UnitSystem.hpp>
#include <opm/common/ErrorMacros.hpp>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>
namespace Opm::Properties {
template<class TypeTag, class MyTypeTag>
@ -131,10 +144,21 @@ public:
{
phaseUsage_ = phaseUsageFromDeck(eclState());
// Only rank 0 does print to std::cout
const auto& comm = grid().comm();
terminalOutput_ = EWOMS_GET_PARAM(TypeTag, bool, EnableTerminalOutput);
terminalOutput_ = terminalOutput_ && (comm.rank() == 0);
// Only rank 0 does print to std::cout, and only if specifically requested.
this->terminalOutput_ = false;
if (this->grid().comm().rank() == 0) {
this->terminalOutput_ = EWOMS_GET_PARAM(TypeTag, bool, EnableTerminalOutput);
this->startConvergenceOutputThread(EWOMS_GET_PARAM(TypeTag, std::string,
ExtraConvergenceOutput),
R"(ExtraConvergenceOutput (--extra-convergence-output))");
}
}
~SimulatorFullyImplicitBlackoilEbos()
{
// Safe to call on all ranks, not just the I/O rank.
this->endConvergenceOutputThread();
}
static void registerParameters()
@ -310,6 +334,13 @@ public:
// update timing.
report_.success.solver_time += solverTimer_->secsSinceStart();
if (this->grid().comm().rank() == 0) {
// Destructively grab the step convergence reports. The solver
// object and the model object contained therein are about to go
// out of scope.
this->writeConvergenceOutput(solver->model().getStepReportsDestructively());
}
// Increment timer, remember well state.
++timer;
@ -318,14 +349,13 @@ public:
const std::string version = moduleVersionName();
outputTimestampFIP(timer, eclState().getTitle(), version);
}
}
if (terminalOutput_) {
std::string msg =
"Time step took " + std::to_string(solverTimer_->secsSinceStart()) + " seconds; "
"total solver time " + std::to_string(report_.success.solver_time) + " seconds.";
OpmLog::debug(msg);
}
return true;
}
@ -382,6 +412,65 @@ protected:
const WellModel& wellModel_() const
{ return ebosSimulator_.problem().wellModel(); }
void startConvergenceOutputThread(std::string_view convOutputOptions,
std::string_view optionName)
{
const auto config = ConvergenceOutputConfiguration {
convOutputOptions, optionName
};
if (! config.want(ConvergenceOutputConfiguration::Option::Iterations)) {
return;
}
auto getPhaseName = ConvergenceOutputThread::ComponentToPhaseName {
[compNames = typename Model::ComponentName{}](const int compIdx)
{ return std::string_view { compNames.name(compIdx) }; }
};
auto convertTime = ConvergenceOutputThread::ConvertToTimeUnits {
[usys = this->eclState().getUnits()](const double time)
{ return usys.from_si(UnitSystem::measure::time, time); }
};
this->convergenceOutputQueue_.emplace();
this->convergenceOutputObject_.emplace
(this->eclState().getIOConfig().getOutputDir(),
this->eclState().getIOConfig().getBaseName(),
std::move(getPhaseName),
std::move(convertTime),
config, *this->convergenceOutputQueue_);
this->convergenceOutputThread_
.emplace(&ConvergenceOutputThread::writeASynchronous,
&this->convergenceOutputObject_.value());
}
void writeConvergenceOutput(std::vector<typename Model::StepReport>&& reports)
{
if (! this->convergenceOutputThread_.has_value()) {
return;
}
auto requests = std::vector<ConvergenceReportQueue::OutputRequest>{};
requests.reserve(reports.size());
for (auto&& report : reports) {
requests.push_back({ report.report_step, report.current_step, std::move(report.report) });
}
this->convergenceOutputQueue_->enqueue(std::move(requests));
}
void endConvergenceOutputThread()
{
if (! this->convergenceOutputThread_.has_value()) {
return;
}
this->convergenceOutputQueue_->signalLastOutputRequest();
this->convergenceOutputThread_->join();
}
// Data.
Simulator& ebosSimulator_;
std::unique_ptr<WellConnectionAuxiliaryModule<TypeTag>> wellAuxMod_;
@ -398,6 +487,10 @@ protected:
std::unique_ptr<time::StopWatch> solverTimer_;
std::unique_ptr<time::StopWatch> totalTimer_;
std::unique_ptr<TimeStepper> adaptiveTimeStepping_;
std::optional<ConvergenceReportQueue> convergenceOutputQueue_{};
std::optional<ConvergenceOutputThread> convergenceOutputObject_{};
std::optional<std::thread> convergenceOutputThread_{};
};
} // namespace Opm

View File

@ -135,6 +135,9 @@ BOOST_AUTO_TEST_CASE(Misprint)
BOOST_CHECK_THROW(Opm::ConvergenceOutputConfiguration{"nonce"},
std::invalid_argument);
BOOST_CHECK_THROW(Opm::ConvergenceOutputConfiguration("nonce", "X"),
std::invalid_argument);
BOOST_CHECK_THROW(Opm::ConvergenceOutputConfiguration{"stepS"},
std::invalid_argument);