Prevent loss of log messages in parallel by merging multiple files.

This completes f94459d5ed
Each process with rank >0 will use .<deckname>.<rank>.DEBUG, and
<deckname>-<rank>.PRT for logging (instead of <file>.<rank>as before.
After the simulator has finished running we will append the content
of those files to the usual log files. If these files have a non-zero
size we will omit a warning as this should not happen if logging is
done right.
This commit is contained in:
Markus Blatt
2016-09-27 14:30:21 +02:00
parent 07318edfa1
commit c0ca9afe5a

View File

@@ -84,6 +84,8 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/filesystem/fstream.hpp>
#include <boost/regex.hpp>
#ifdef _OPENMP #ifdef _OPENMP
#include <omp.h> #include <omp.h>
@@ -109,6 +111,101 @@ namespace Opm
{ {
boost::filesystem::path simulationCaseName( const std::string& casename ); boost::filesystem::path simulationCaseName( const std::string& casename );
int64_t convertMessageType(const Message::type& mtype); int64_t convertMessageType(const Message::type& mtype);
namespace fs = boost::filesystem;
/// \brief A functor that merges multiple files of a parallel run to one file.
///
/// Without care multiple processes might log messages in a parallel run.
/// Non-root processes will do that to seperate files
/// <basename>.<rank>.<extension. This functor will append those file
/// to usual ones and delete the other files.
class ParallelFileMerger
{
public:
/// \brief Constructor
/// \param output_dir The output directory to use for reading/Writing.
/// \param deckanme The name of the deck.
ParallelFileMerger(fs::path output_dir,
const std::string& deckname)
: debugFileRegex_("\\."+deckname+"\\.\\d+\\.DEBUG"),
logFileRegex_(deckname+"\\.\\d+\\.PRT")
{
auto debugPath = output_dir;
debugPath /= (std::string(".") + deckname + ".DEBUG");
debugStream_.reset(new fs::ofstream(debugPath,
std::ofstream::app));
auto logPath = output_dir;
logPath /= ( deckname + ".PRT");
logStream_.reset(new fs::ofstream(logPath,
std::ofstream::app));
}
void operator()(const fs::path& file)
{
const static boost::regex regex(".+\\.(\\d+)\\..+");
boost::smatch matches;
std::string filename = file.filename().native();
if ( boost::regex_match(filename, matches, regex) )
{
std::string rank = boost::regex_replace(filename, regex, "\\1");
if( boost::regex_match(filename, logFileRegex_) )
{
appendFile(*logStream_, file, rank);
}
else
{
if (boost::regex_match(filename, debugFileRegex_) )
{
appendFile(*debugStream_, file, rank);
}
else
{
OPM_THROW(std::runtime_error,
"Unrecognized file with name "
<< filename
<< " from parallel run.");
}
}
}
}
private:
/// \brief Append contents of a file to a stream
/// \brief of The output stream to use.
/// \brief file The file whose content to append.
/// \brief rank The rank that wrote the file.
void appendFile(fs::ofstream& of, const fs::path& file, const std::string& rank)
{
if( fs::file_size(file) )
{
std::cerr<<"WARNING: There has been logging out by non-root process "
<<rank<<std::endl<<"Please report this in the issue tracker!"
<<std::endl;
fs::ifstream in(file);
of<<std::endl<< std::endl;
of<<"=======================================================";
of<<std::endl<<std::endl;
of<<" Output written by rank "<<rank<<" to file "<<file.string()<<":"<<std::endl<<std::endl;
of<<in.rdbuf()<<std::endl<<std::endl;
of<<"======================== end output =====================";
of<<std::endl;
in.close();
}
fs::remove(file);
}
/// \brief Regex to capture .*.DEBUG
boost::regex debugFileRegex_;
/// \brief Regex to capture *.PRT
boost::regex logFileRegex_;
/// \brief Stream to *.DEBUG file
std::unique_ptr<fs::ofstream> debugStream_;
/// \brief Stream to *.PRT file
std::unique_ptr<fs::ofstream> logStream_;
};
} }
@@ -152,7 +249,11 @@ namespace Opm
asImpl().createSimulator(); asImpl().createSimulator();
// Run. // Run.
return asImpl().runSimulator(); auto ret = asImpl().runSimulator();
asImpl().mergeParallelLogFiles();
return ret;
} }
catch (const std::exception &e) { catch (const std::exception &e) {
std::ostringstream message; std::ostringstream message;
@@ -390,12 +491,14 @@ namespace Opm
baseName = path(fpath.filename()).string(); baseName = path(fpath.filename()).string();
} }
if (param_.has("output_dir")) { if (param_.has("output_dir")) {
logFileStream << output_dir_ << "/" << baseName + ".PRT"; logFileStream << output_dir_ << "/";
debugFileStream << output_dir_ + "/." + baseName + ".DEBUG"; debugFileStream << output_dir_ + "/";
} else {
logFileStream << baseName << ".PRT";
debugFileStream << "." << baseName << ".DEBUG";
} }
logFileStream << baseName;
debugFileStream << "." << baseName;
if ( must_distribute_ && mpi_rank_ != 0 )
{ {
// Added rank to log file for non-zero ranks. // Added rank to log file for non-zero ranks.
// This prevents message loss. // This prevents message loss.
@@ -403,6 +506,9 @@ namespace Opm
// If the following file appears then there is a bug. // If the following file appears then there is a bug.
logFileStream << "." << mpi_rank_; logFileStream << "." << mpi_rank_;
} }
logFileStream << ".PRT";
debugFileStream << ".DEBUG";
std::string debugFile = debugFileStream.str(); std::string debugFile = debugFileStream.str();
logFile_ = logFileStream.str(); logFile_ = logFileStream.str();
@@ -422,10 +528,29 @@ namespace Opm
} }
} }
void mergeParallelLogFiles()
{
// force closing of all log files.
OpmLog::removeAllBackends();
if( mpi_rank_ != 0 || !must_distribute_ )
{
return;
}
namespace fs = boost::filesystem;
fs::path output_path(".");
if ( param_.has("output_dir") )
{
output_path = fs::path(output_dir_);
}
fs::path deck_filename(param_.get<std::string>("deck_filename"));
std::for_each(fs::directory_iterator(output_path),
fs::directory_iterator(),
detail::ParallelFileMerger(output_path, deck_filename.stem().string()));
}
// Parser the input and creates the Deck and EclipseState objects. // Parser the input and creates the Deck and EclipseState objects.
// Writes to: // Writes to: