/* Copyright 2022 Equinor ASA. This file is part of the Open Porous Media project (OPM). OPM is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. OPM is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with OPM. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace { auto fixedHeaders() noexcept { using namespace std::literals::string_literals; return std::array { "ReportStep"s, "TimeStep"s, "Time"s, "Iteration"s, "CnvPvFracConv"s, "CnvPvFracRelax"s, "CnvPvFracUnconv"s, "CnvCellCntConv"s, "CnvCellCntRelax"s, "CnvCellCntUnconv"s, }; } template auto maxHeaderSize(const HeaderSequence& headers) { using sz_t = std::remove_cv_t().front().size())>>; if (headers.empty()) { return sz_t{0}; } return std::accumulate(headers.begin(), headers.end(), sz_t{1}, [](const auto m, const auto& header) { return std::max(m, header.size() + 1); }); } std::string formatMetricColumn(const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName, const Opm::ConvergenceReport::ReservoirConvergenceMetric& metric) { std::ostringstream os; os << to_string(metric.type()) << '_' << getPhaseName(metric.phase()); return os.str(); } std::string::size_type maxColHeaderSize(const std::string::size_type minColSize, const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName, const std::vector& cols) { return std::accumulate(cols.begin(), cols.end(), minColSize, [&getPhaseName](const std::string::size_type maxChar, const Opm::ConvergenceReport::ReservoirConvergenceMetric& metric) { return std::max(maxChar, formatMetricColumn(getPhaseName, metric).size() + 1); }); } std::pair writeConvergenceHeader(std::ostream& os, const Opm::ConvergenceOutputThread::ComponentToPhaseName& getPhaseName, const Opm::ConvergenceReportQueue::OutputRequest& firstRequest) { const auto initial_headers = fixedHeaders(); const auto minColSize = maxHeaderSize(initial_headers); { auto leadingSpace = false; for (const auto& columnHeader : initial_headers) { if (leadingSpace) { os << ' '; } os << std::right << std::setw(minColSize) << columnHeader; leadingSpace = true; } } const auto& metrics = firstRequest.reports.front().reservoirConvergence(); const auto headerSize = maxColHeaderSize(minColSize, getPhaseName, metrics); for (const auto& metric : metrics) { os << std::right << std::setw(headerSize) << formatMetricColumn(getPhaseName, metric); } // Note: Newline character intentionally placed in separate output // request to not influence right-justification of column header. os << std::right << std::setw(headerSize) << "WellStatus" << '\n'; return { minColSize, headerSize }; } void writeTimeColumns(std::ostream& os, const Opm::ConvergenceOutputThread::ConvertToTimeUnits& convertTime, const std::string::size_type firstColSize, const int iter, const Opm::ConvergenceReport& report, const Opm::ConvergenceReportQueue::OutputRequest& request) { os << std::setw(firstColSize) << request.reportStep << ' ' << std::setw(firstColSize) << request.currentStep << ' ' << std::setprecision(4) << std::setw(firstColSize) << convertTime(report.reportTime()) << ' ' << std::setw(firstColSize) << iter; } void writeCnvPvSplit(std::ostream& os, const std::vector::size_type expectedNumValues, const std::string::size_type firstColSize, const Opm::ConvergenceReport& report) { const auto& [splitPv, cellCnt] = report.cnvPvSplit(); if (splitPv.size() == expectedNumValues) { for (const auto& pv : splitPv) { os << ' ' << std::setprecision(4) << std::setw(firstColSize) << pv / report.eligiblePoreVolume(); } } else { constexpr auto val = std::numeric_limits::has_quiet_NaN ? std::numeric_limits::quiet_NaN() : -1.0; for (auto i = 0*expectedNumValues; i < expectedNumValues; ++i) { os << ' ' << std::setprecision(4) << std::setw(firstColSize) << val; } } if (cellCnt.size() == expectedNumValues) { for (const auto& cnt : cellCnt) { os << ' ' << std::setw(firstColSize) << cnt; } } else { for (auto i = 0*expectedNumValues; i < expectedNumValues; ++i) { os << ' ' << std::setw(firstColSize) << -1; } } } void writeReservoirConvergence(std::ostream& os, const std::string::size_type colSize, const Opm::ConvergenceReport& report) { for (const auto& metric : report.reservoirConvergence()) { os << std::setprecision(4) << std::setw(colSize) << metric.value(); } } void writeWellConvergence(std::ostream& os, const std::string::size_type colSize, const Opm::ConvergenceReport& report) { os << std::right << std::setw(colSize) << (report.wellFailed() ? "FAIL" : "CONV"); if (report.wellFailed()) { for (const auto& wf : report.wellFailures()) { os << ' ' << to_string(wf); } } } void writeConvergenceRequest(std::ostream& os, const Opm::ConvergenceOutputThread::ConvertToTimeUnits& convertTime, const std::string::size_type firstColSize, const std::string::size_type colSize, const Opm::ConvergenceReportQueue::OutputRequest& request) { os.setf(std::ios_base::scientific); const auto expectNumCnvSplit = std::vector::size_type{3}; auto iter = 0; for (const auto& report : request.reports) { writeTimeColumns(os, convertTime, firstColSize, iter, report, request); writeCnvPvSplit(os, expectNumCnvSplit, firstColSize, report); writeReservoirConvergence(os, colSize, report); writeWellConvergence(os, colSize, report); os << '\n'; ++iter; } } } // Anonymous namespace // --------------------------------------------------------------------------- class Opm::ConvergenceOutputThread::Impl { public: explicit Impl(std::string_view outputDir, std::string_view baseName, ComponentToPhaseName getPhaseName, ConvertToTimeUnits convertTime, ConvergenceOutputConfiguration config, ConvergenceReportQueue& queue); ConvergenceReportQueue& queue() { return this->queue_; } void write(const std::vector& requests); bool finalRequestWritten() const { return this->finalRequestWritten_; } private: std::reference_wrapper queue_; ComponentToPhaseName getPhaseName_{}; ConvertToTimeUnits convertTime_{}; std::optional infoIter_{}; std::string::size_type firstColSize_{0}; std::string::size_type colSize_{0}; bool haveOutputIterHeader_{false}; bool finalRequestWritten_{false}; void writeIterInfo(const std::vector& requests); }; Opm::ConvergenceOutputThread::Impl::Impl(std::string_view outputDir, std::string_view baseName, ComponentToPhaseName getPhaseName, ConvertToTimeUnits convertTime, ConvergenceOutputConfiguration config, ConvergenceReportQueue& queue) : queue_ { std::ref(queue) } , getPhaseName_ { std::move(getPhaseName) } , convertTime_ { std::move(convertTime) } { if (config.want(ConvergenceOutputConfiguration::Option::Iterations)) { this->infoIter_.emplace (std::filesystem::path { outputDir } / std::filesystem::path { baseName }.concat(".INFOITER")); } } void Opm::ConvergenceOutputThread::Impl:: write(const std::vector& requests) { assert (! requests.empty() && "Internal logic error in forming convergence output request"); this->writeIterInfo(requests); } void Opm::ConvergenceOutputThread::Impl:: writeIterInfo(const std::vector& requests) { if (! this->infoIter_.has_value()) { return; } if (! this->haveOutputIterHeader_) { std::tie(this->firstColSize_, this->colSize_) = writeConvergenceHeader(this->infoIter_.value(), this->getPhaseName_, requests.front()); this->haveOutputIterHeader_ = true; } for (const auto& request : requests) { writeConvergenceRequest(this->infoIter_.value(), this->convertTime_, this->firstColSize_, this->colSize_, request); if (request.reports.empty()) { this->finalRequestWritten_ = true; break; } } this->infoIter_.value().flush(); } // =========================================================================== // Public Interface Below Separator // =========================================================================== void Opm::ConvergenceReportQueue::enqueue(std::vector&& requests) { // Signal output thread if we're going from "no work" to "some work". // We don't need to signal if we're going from "some work" to "more // work". auto must_notify = false; { std::lock_guard guard{ this->mtx_ }; must_notify = this->requests_.empty(); this->requests_.insert(this->requests_.end(), std::make_move_iterator(requests.begin()), std::make_move_iterator(requests.end())); } if (must_notify) { this->cv_.notify_one(); } } void Opm::ConvergenceReportQueue::signalLastOutputRequest() { // Empty request signals end of production. this->enqueue(std::vector(1)); } // --------------------------------------------------------------------------- Opm::ConvergenceOutputThread:: ConvergenceOutputThread(std::string_view outputDir, std::string_view baseName, ComponentToPhaseName getPhaseName, ConvertToTimeUnits convertTime, ConvergenceOutputConfiguration config, ConvergenceReportQueue& queue) : pImpl_ { std::make_unique(outputDir, baseName, getPhaseName, convertTime, config, queue) } {} Opm::ConvergenceOutputThread::~ConvergenceOutputThread() = default; Opm::ConvergenceOutputThread::ConvergenceOutputThread(ConvergenceOutputThread&& src) : pImpl_ { std::move(src.pImpl_) } {} void Opm::ConvergenceOutputThread:: writeSynchronous(std::vector&& requests) { this->pImpl_->write(requests); } void Opm::ConvergenceOutputThread::writeASynchronous() { // This is the main function of the convergence output thread. It runs // for the duration of the process, although mostly in an idle/waiting // state. Implementation from Microsoft's "GoingNative" show, episode // 53, on threading and parallelism in the STL. auto& queue = this->pImpl_->queue(); // Note: Loop terminates only when final request written. for (auto localReq = std::vector{} ; ; localReq.clear()) { std::unique_lock lock { queue.mtx_ }; queue.cv_.wait(lock, [&queue]() { return !queue.requests_.empty(); }); // Capture all pending output requests, relinquish lock and write // file output outside of the critical section. queue.requests_.swap(localReq); lock.unlock(); this->pImpl_->write(localReq); if (this->pImpl_->finalRequestWritten()) { // No more output coming. Shut down thread. return; } } }