diff --git a/CMakeLists_files.cmake b/CMakeLists_files.cmake index f6df0f2e9..b05f3cee8 100644 --- a/CMakeLists_files.cmake +++ b/CMakeLists_files.cmake @@ -42,6 +42,7 @@ list (APPEND MAIN_SOURCE_FILES opm/simulators/timestepping/SimulatorReport.cpp opm/simulators/flow/countGlobalCells.cpp opm/simulators/flow/ConvergenceOutputConfiguration.cpp + opm/simulators/flow/ExtraConvergenceOutputThread.cpp opm/simulators/flow/KeywordValidation.cpp opm/simulators/flow/SimulatorFullyImplicitBlackoilEbos.cpp opm/simulators/flow/ValidationFunctions.cpp @@ -276,6 +277,7 @@ list (APPEND PUBLIC_HEADER_FILES opm/simulators/flow/BlackoilModelEbos.hpp opm/simulators/flow/BlackoilModelParametersEbos.hpp opm/simulators/flow/ConvergenceOutputConfiguration.hpp + opm/simulators/flow/ExtraConvergenceOutputThread.hpp opm/simulators/flow/FlowMainEbos.hpp opm/simulators/flow/Main.hpp opm/simulators/flow/NonlinearSolverEbos.hpp diff --git a/opm/simulators/flow/BlackoilModelEbos.hpp b/opm/simulators/flow/BlackoilModelEbos.hpp index d5505c14b..78f4b3411 100644 --- a/opm/simulators/flow/BlackoilModelEbos.hpp +++ b/opm/simulators/flow/BlackoilModelEbos.hpp @@ -1062,6 +1062,11 @@ namespace Opm { return convergence_reports_; } + std::vector getStepReportsDestructively() const + { + return std::move(this->convergence_reports_); + } + protected: // --------- Data members --------- diff --git a/opm/simulators/flow/ExtraConvergenceOutputThread.cpp b/opm/simulators/flow/ExtraConvergenceOutputThread.cpp new file mode 100644 index 000000000..b550c4a5f --- /dev/null +++ b/opm/simulators/flow/ExtraConvergenceOutputThread.cpp @@ -0,0 +1,324 @@ +/* + Copyright 2022 Equinor ASA. + + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . +*/ + +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { + std::string to_string(const Opm::ConvergenceReport::ReservoirFailure::Type t) + { + using Type = Opm::ConvergenceReport::ReservoirFailure::Type; + + const auto type_strings = std::unordered_map { + { Type::Invalid , "Invalid" }, + { Type::MassBalance, "MB" }, + { Type::Cnv , "CNV" }, + }; + + auto strPos = type_strings.find(t); + assert ((strPos != type_strings.end()) && + "Unsupported convergence metric type"); + + return strPos->second; + } + + std::string + formatMetricColumn(const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName, + const Opm::ConvergenceReport::ReservoirConvergenceMetric& metric) + { + std::ostringstream os; + os << to_string(metric.type()) << '_' << getPhaseName(metric.phase()); + + return os.str(); + } + + std::string::size_type + maxColHeaderSize(const std::string::size_type minColSize, + const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName, + const std::vector& cols) + { + return std::accumulate(cols.begin(), cols.end(), minColSize, + [&getPhaseName](const std::string::size_type maxChar, + const Opm::ConvergenceReport::ReservoirConvergenceMetric& metric) + { + return std::max(maxChar, formatMetricColumn(getPhaseName, metric).size()); + }); + } + + std::string::size_type + writeConvergenceHeader(std::ostream& os, + const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName, + const Opm::ConvergenceReportQueue::OutputRequest& firstRequest) + { + const auto minColSize = std::string::size_type{11}; + + os << std::right << std::setw(minColSize) << "ReportStep" << ' ' + << std::right << std::setw(minColSize) << "TimeStep" << ' ' + << std::right << std::setw(minColSize) << "Time" << ' ' + << std::right << std::setw(minColSize) << "Iteration"; + + const auto& metrics = firstRequest.reports.front().reservoirConvergence(); + const auto maxChar = maxColHeaderSize(minColSize, getPhaseName, metrics); + + for (const auto& metric : metrics) { + os << std::right << std::setw(maxChar + 1) + << formatMetricColumn(getPhaseName, metric); + } + + // Note: Newline character intentionally placed in separate output + // request to not influence right-justification of column header. + os << std::right << std::setw(maxChar + 1) << "WellStatus" << '\n'; + + return maxChar; + } + + void writeConvergenceRequest(std::ostream& os, + const Opm::ConvergenceOutputThread::ConvertToTimeUnits& convertTime, + std::string::size_type colSize, + const Opm::ConvergenceReportQueue::OutputRequest& request) + { + const auto firstColSize = std::string::size_type{11}; + + os.setf(std::ios_base::scientific); + + auto iter = 0; + for (const auto& report : request.reports) { + os << std::setw(firstColSize) << request.reportStep << ' ' + << std::setw(firstColSize) << request.currentStep << ' ' + << std::setprecision(4) << std::setw(firstColSize) + << convertTime(report.reportTime()) << ' ' + << std::setw(firstColSize) << iter; + + for (const auto& metric : report.reservoirConvergence()) { + os << std::setprecision(4) << std::setw(colSize + 1) << metric.value(); + } + + os << std::right << std::setw(colSize + 1) + << (report.wellFailed() ? "FAIL" : "CONV") << '\n'; + + ++iter; + } + } +} // Anonymous namespace + +// --------------------------------------------------------------------------- + +class Opm::ConvergenceOutputThread::Impl +{ +public: + explicit Impl(std::string_view outputDir, + std::string_view baseName, + ComponentToPhaseName getPhaseName, + ConvertToTimeUnits convertTime, + ConvergenceOutputConfiguration config, + ConvergenceReportQueue& queue); + + ConvergenceReportQueue& queue() + { + return this->queue_; + } + + void write(const std::vector& requests); + bool finalRequestWritten() const + { + return this->finalRequestWritten_; + } + +private: + std::reference_wrapper queue_; + ComponentToPhaseName getPhaseName_{}; + ConvertToTimeUnits convertTime_{}; + std::optional infoIter_{}; + std::string::size_type colSize_{0}; + bool haveOutputIterHeader_{false}; + bool finalRequestWritten_{false}; + + void writeIterInfo(const std::vector& requests); +}; + +Opm::ConvergenceOutputThread::Impl::Impl(std::string_view outputDir, + std::string_view baseName, + ComponentToPhaseName getPhaseName, + ConvertToTimeUnits convertTime, + ConvergenceOutputConfiguration config, + ConvergenceReportQueue& queue) + : queue_ { std::ref(queue) } + , getPhaseName_ { std::move(getPhaseName) } + , convertTime_ { std::move(convertTime) } +{ + if (config.want(ConvergenceOutputConfiguration::Option::Iterations)) { + this->infoIter_.emplace + (std::filesystem::path { outputDir } / + std::filesystem::path { baseName }.concat(".INFOITER")); + } +} + +void +Opm::ConvergenceOutputThread::Impl:: +write(const std::vector& requests) +{ + assert (! requests.empty() && + "Internal logic error in forming convergence output request"); + + this->writeIterInfo(requests); +} + +void +Opm::ConvergenceOutputThread::Impl:: +writeIterInfo(const std::vector& requests) +{ + if (! this->infoIter_.has_value()) { + return; + } + + if (! this->haveOutputIterHeader_) { + this->colSize_ = + writeConvergenceHeader(this->infoIter_.value(), + this->getPhaseName_, + requests.front()); + this->haveOutputIterHeader_ = true; + } + + for (const auto& request : requests) { + writeConvergenceRequest(this->infoIter_.value(), + this->convertTime_, + this->colSize_, + request); + + if (request.reports.empty()) { + this->finalRequestWritten_ = true; + break; + } + } + + this->infoIter_.value().flush(); +} + +// =========================================================================== +// Public Interface Below Separator +// =========================================================================== + +void Opm::ConvergenceReportQueue::enqueue(std::vector&& requests) +{ + // Signal output thread if we're going from "no work" to "some work". + // We don't need to signal if we're going from "some work" to "more + // work". + auto must_notify = false; + + { + std::lock_guard guard{ this->mtx_ }; + must_notify = this->requests_.empty(); + this->requests_.insert(this->requests_.end(), + std::make_move_iterator(requests.begin()), + std::make_move_iterator(requests.end())); + } + + if (must_notify) { + this->cv_.notify_one(); + } +} + +void Opm::ConvergenceReportQueue::signalLastOutputRequest() +{ + // Empty request signals end of production. + this->enqueue(std::vector(1)); +} + +// --------------------------------------------------------------------------- + +Opm::ConvergenceOutputThread:: +ConvergenceOutputThread(std::string_view outputDir, + std::string_view baseName, + ComponentToPhaseName getPhaseName, + ConvertToTimeUnits convertTime, + ConvergenceOutputConfiguration config, + ConvergenceReportQueue& queue) + : pImpl_ { std::make_unique(outputDir, + baseName, + getPhaseName, + convertTime, + config, + queue) } +{} + +Opm::ConvergenceOutputThread::~ConvergenceOutputThread() = default; + +Opm::ConvergenceOutputThread::ConvergenceOutputThread(ConvergenceOutputThread&& src) + : pImpl_ { std::move(src.pImpl_) } +{} + +void +Opm::ConvergenceOutputThread:: +writeSynchronous(std::vector&& requests) +{ + this->pImpl_->write(requests); +} + +void Opm::ConvergenceOutputThread::writeASynchronous() +{ + // This is the main function of the convergence output thread. It runs + // for the duration of the process, although mostly in an idle/waiting + // state. Implementation from Microsoft's "GoingNative" show, episode + // 53, on threading and parallelism in the STL. + + auto& queue = this->pImpl_->queue(); + + // Note: Loop terminates only when final request written. + for (auto localReq = std::vector{} ; ; localReq.clear()) { + std::unique_lock lock { queue.mtx_ }; + queue.cv_.wait(lock, [&queue]() { return !queue.requests_.empty(); }); + + // Capture all pending output requests, relinquish lock and write + // file output outside of the critical section. + queue.requests_.swap(localReq); + + lock.unlock(); + + this->pImpl_->write(localReq); + + if (this->pImpl_->finalRequestWritten()) { + // No more output coming. Shut down thread. + return; + } + } +} diff --git a/opm/simulators/flow/ExtraConvergenceOutputThread.hpp b/opm/simulators/flow/ExtraConvergenceOutputThread.hpp new file mode 100644 index 000000000..49a6d7e81 --- /dev/null +++ b/opm/simulators/flow/ExtraConvergenceOutputThread.hpp @@ -0,0 +1,191 @@ +/* + Copyright 2022 Equinor ASA. + + This file is part of the Open Porous Media project (OPM). + + OPM is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + OPM is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with OPM. If not, see . +*/ + +#ifndef EXTRA_CONVERGENCE_OUTPUT_THREAD_HPP +#define EXTRA_CONVERGENCE_OUTPUT_THREAD_HPP + +#include + +#include +#include +#include +#include +#include +#include + +namespace Opm { + class ConvergenceOutputConfiguration; +} // namespace Opm + +/// \file System for outputting additional convergence information, such as +/// material balance and CNV values, at each non-linear iteration. +/// +/// Supports an asynchronous protocol that assumes there is a single thread +/// dedicated to per-iteration file output. Synchronous file output is +/// available for debugging and development purposes. + +namespace Opm +{ + +/// Forward declaration so that Queue may declare this type its 'friend'. +class ConvergenceOutputThread; + +/// Communication channel between thread creating output requests and +/// consumer thread writing those requests to a file. +/// +/// Output thread has access to internal state. Producer thread uses public +/// interface. Producer thread creates an object of this type and launches +/// the output thread with a reference to that queue object. +class ConvergenceReportQueue +{ +public: + /// Single output request. + /// + /// Associates non-linear iteration convergence information to single + /// report and timestep. + struct OutputRequest + { + /// Current report step + int reportStep{-1}; + + /// Current timestep within \c reportStep. Expected to be a small + /// integer. + int currentStep{-1}; + + /// Convergence metrics for each non-linear ieration in the \c + /// currentStep. + std::vector reports{}; + }; + + /// Push sequence of output requests, typically all substeps whether + /// converged or not, of a single report step. + /// + /// \param[in] requests Output request sequence. Queue takes ownership. + void enqueue(std::vector&& requests); + + /// Signal end of output request stream. + /// + /// No additional requests should be added to queue following a call to + /// this member function. Output thread detects this signal, completes + /// any pending output requests, and shuts down afterwards. + void signalLastOutputRequest(); + + friend class ConvergenceOutputThread; + +private: + /// Mutex for critical sections protecting 'requests_'. + std::mutex mtx_{}; + + /// Condition variable for threads waiting on changes to 'requests_'. + std::condition_variable cv_{}; + + /// Pending convergence output requests. + std::vector requests_{}; +}; + +/// Encapsulating object for thread processing producer's convergence output +/// requests. +class ConvergenceOutputThread +{ +public: + /// Protocol for converting a phase/component ID into a human readable + /// phase/component name. + using ComponentToPhaseName = std::function; + + /// Protocol for converting an SI elapsed time value into an equivalent + /// time value in the run's output conventions. + /// + /// Will typically use \code UnitSystem::from_si() \endcode. + using ConvertToTimeUnits = std::function; + + /// Constructor. + /// + /// \param[in] outputDir -- Name of run's output directory. Any file + /// output will be written to this directory. + /// + /// \param[in] baseName -- Run's base name. Output files will have this + /// name and a type-specific file extension. + /// + /// \param[in] getPhaseName -- Callable object for converting component + /// indices into human readable component names. + /// + /// \param[in] convertTime -- Callable object for converting SI elapsed + /// time values into equivalent elapsed time + /// values using run's time conventions. + /// + /// \param[in] config -- Convergence output configuration options. + /// Determines whether to output additional + /// convergence information and, if so, what + /// information to output. + /// + /// \param[in] queue -- Communication channel between producer thread + /// and this output thread. User must form a + /// valid queue prior to creating the output + /// thread object. + explicit ConvergenceOutputThread(std::string_view outputDir, + std::string_view baseName, + ComponentToPhaseName getPhaseName, + ConvertToTimeUnits convertTime, + ConvergenceOutputConfiguration config, + ConvergenceReportQueue& queue); + + /// Deleted copy constructor. + ConvergenceOutputThread(const ConvergenceOutputThread& src) = delete; + + /// Move constructor. + ConvergenceOutputThread(ConvergenceOutputThread&& src); + + /// Deleted assignment operator. + ConvergenceOutputThread& operator=(const ConvergenceOutputThread& src) = delete; + + /// Deleted move-assignment operator. + ConvergenceOutputThread& operator=(ConvergenceOutputThread&& src) = delete; + + /// Destructor. + /// + /// Needed for pimpl idiom. + ~ConvergenceOutputThread(); + + /// Perform synchronous file output of a sequence of requests. + /// + /// Mostly for development and debugging purposes. + /// + /// \param[in] requests Output request sequence. Thread takes ownership. + void writeSynchronous(std::vector&& requests); + + /// Output thread worker function + /// + /// This is the endpoint that users should associate to a \code + /// std::thread \endcode object. + /// + /// Returns once last pending output request is written (cf. \code + /// ConvergenceReportQueue::signalLastOutputRequest() \endcode.) + void writeASynchronous(); + +private: + /// Private implementation class. + class Impl; + + /// Pointer to implementation. + std::unique_ptr pImpl_; +}; + +} // namespace Opm + +#endif // EXTRA_CONVERGENCE_OUTPUT_THREAD_HPP diff --git a/opm/simulators/flow/SimulatorFullyImplicitBlackoilEbos.hpp b/opm/simulators/flow/SimulatorFullyImplicitBlackoilEbos.hpp index 83cdb890c..0f89eee1a 100644 --- a/opm/simulators/flow/SimulatorFullyImplicitBlackoilEbos.hpp +++ b/opm/simulators/flow/SimulatorFullyImplicitBlackoilEbos.hpp @@ -22,17 +22,30 @@ #ifndef OPM_SIMULATORFULLYIMPLICITBLACKOILEBOS_HEADER_INCLUDED #define OPM_SIMULATORFULLYIMPLICITBLACKOILEBOS_HEADER_INCLUDED -#include #include #include -#include +#include +#include +#include #include -#include #include +#include +#include + #include +#include + #include +#include +#include +#include +#include +#include +#include +#include + namespace Opm::Properties { template @@ -131,10 +144,21 @@ public: { phaseUsage_ = phaseUsageFromDeck(eclState()); - // Only rank 0 does print to std::cout - const auto& comm = grid().comm(); - terminalOutput_ = EWOMS_GET_PARAM(TypeTag, bool, EnableTerminalOutput); - terminalOutput_ = terminalOutput_ && (comm.rank() == 0); + // Only rank 0 does print to std::cout, and only if specifically requested. + this->terminalOutput_ = false; + if (this->grid().comm().rank() == 0) { + this->terminalOutput_ = EWOMS_GET_PARAM(TypeTag, bool, EnableTerminalOutput); + + this->startConvergenceOutputThread(EWOMS_GET_PARAM(TypeTag, std::string, + ExtraConvergenceOutput), + R"(ExtraConvergenceOutput (--extra-convergence-output))"); + } + } + + ~SimulatorFullyImplicitBlackoilEbos() + { + // Safe to call on all ranks, not just the I/O rank. + this->endConvergenceOutputThread(); } static void registerParameters() @@ -310,6 +334,13 @@ public: // update timing. report_.success.solver_time += solverTimer_->secsSinceStart(); + if (this->grid().comm().rank() == 0) { + // Destructively grab the step convergence reports. The solver + // object and the model object contained therein are about to go + // out of scope. + this->writeConvergenceOutput(solver->model().getStepReportsDestructively()); + } + // Increment timer, remember well state. ++timer; @@ -318,14 +349,13 @@ public: const std::string version = moduleVersionName(); outputTimestampFIP(timer, eclState().getTitle(), version); } - } - if (terminalOutput_) { std::string msg = "Time step took " + std::to_string(solverTimer_->secsSinceStart()) + " seconds; " "total solver time " + std::to_string(report_.success.solver_time) + " seconds."; OpmLog::debug(msg); } + return true; } @@ -382,6 +412,65 @@ protected: const WellModel& wellModel_() const { return ebosSimulator_.problem().wellModel(); } + void startConvergenceOutputThread(std::string_view convOutputOptions, + std::string_view optionName) + { + const auto config = ConvergenceOutputConfiguration { + convOutputOptions, optionName + }; + if (! config.want(ConvergenceOutputConfiguration::Option::Iterations)) { + return; + } + + auto getPhaseName = ConvergenceOutputThread::ComponentToPhaseName { + [compNames = typename Model::ComponentName{}](const int compIdx) + { return std::string_view { compNames.name(compIdx) }; } + }; + + auto convertTime = ConvergenceOutputThread::ConvertToTimeUnits { + [usys = this->eclState().getUnits()](const double time) + { return usys.from_si(UnitSystem::measure::time, time); } + }; + + this->convergenceOutputQueue_.emplace(); + this->convergenceOutputObject_.emplace + (this->eclState().getIOConfig().getOutputDir(), + this->eclState().getIOConfig().getBaseName(), + std::move(getPhaseName), + std::move(convertTime), + config, *this->convergenceOutputQueue_); + + this->convergenceOutputThread_ + .emplace(&ConvergenceOutputThread::writeASynchronous, + &this->convergenceOutputObject_.value()); + } + + void writeConvergenceOutput(std::vector&& reports) + { + if (! this->convergenceOutputThread_.has_value()) { + return; + } + + auto requests = std::vector{}; + requests.reserve(reports.size()); + + for (auto&& report : reports) { + requests.push_back({ report.report_step, report.current_step, std::move(report.report) }); + } + + this->convergenceOutputQueue_->enqueue(std::move(requests)); + } + + void endConvergenceOutputThread() + { + if (! this->convergenceOutputThread_.has_value()) { + return; + } + + this->convergenceOutputQueue_->signalLastOutputRequest(); + this->convergenceOutputThread_->join(); + } + // Data. Simulator& ebosSimulator_; std::unique_ptr> wellAuxMod_; @@ -398,6 +487,10 @@ protected: std::unique_ptr solverTimer_; std::unique_ptr totalTimer_; std::unique_ptr adaptiveTimeStepping_; + + std::optional convergenceOutputQueue_{}; + std::optional convergenceOutputObject_{}; + std::optional convergenceOutputThread_{}; }; } // namespace Opm diff --git a/tests/test_convergenceoutputconfiguration.cpp b/tests/test_convergenceoutputconfiguration.cpp index 5e83a4de8..5547ac449 100644 --- a/tests/test_convergenceoutputconfiguration.cpp +++ b/tests/test_convergenceoutputconfiguration.cpp @@ -135,6 +135,9 @@ BOOST_AUTO_TEST_CASE(Misprint) BOOST_CHECK_THROW(Opm::ConvergenceOutputConfiguration{"nonce"}, std::invalid_argument); + BOOST_CHECK_THROW(Opm::ConvergenceOutputConfiguration("nonce", "X"), + std::invalid_argument); + BOOST_CHECK_THROW(Opm::ConvergenceOutputConfiguration{"stepS"}, std::invalid_argument);