gather deferredLogger implemented

This commit is contained in:
Franz G. Fuchs 2019-01-11 13:53:18 +01:00
parent 4cd8eba331
commit 831a374d02
7 changed files with 331 additions and 6 deletions

View File

@ -116,6 +116,18 @@ opm_add_test(test_gatherconvergencereport
5 ${CMAKE_BINARY_DIR}
)
opm_add_test(test_gatherdeferredlogger
DEPENDS "opmsimulators"
LIBRARIES opmsimulators ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}
SOURCES
tests/test_gatherdeferredlogger.cpp
CONDITION
MPI_FOUND
DRIVER_ARGS
5 ${CMAKE_BINARY_DIR}
)
opm_add_test(flow
ONLY_COMPILE
ALWAYS_ENABLE

View File

@ -45,6 +45,7 @@ list (APPEND MAIN_SOURCE_FILES
opm/simulators/timestepping/AdaptiveSimulatorTimer.cpp
opm/simulators/timestepping/SimulatorTimer.cpp
opm/simulators/timestepping/gatherConvergenceReport.cpp
opm/simulators/gatherDeferredLogger.cpp
)
# originally generated with the command:
@ -174,4 +175,5 @@ list (APPEND PUBLIC_HEADER_FILES
opm/simulators/timestepping/SimulatorTimer.hpp
opm/simulators/timestepping/SimulatorTimerInterface.hpp
opm/simulators/timestepping/gatherConvergenceReport.hpp
opm/simulators/gatherDeferredLogger.hpp
)

View File

@ -17,6 +17,7 @@
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
//#include <opm/simulators/gatherDeferredLogger.hpp>
#include <opm/simulators/DeferredLogger.hpp>

View File

@ -26,6 +26,10 @@
#include <string>
#include <vector>
namespace Opm {
class DeferredLogger;
}
namespace Opm
{
/** This class implements a deferred logger:
@ -36,6 +40,14 @@ namespace Opm
class DeferredLogger
{
public:
struct Message
{
int64_t flag;
std::string tag;
std::string text;
};
void info(const std::string& tag, const std::string& message);
void warning(const std::string& tag, const std::string& message);
void error(const std::string& tag, const std::string& message);
@ -55,13 +67,8 @@ namespace Opm
void logMessages();
private:
struct Message
{
int64_t flag;
std::string tag;
std::string text;
};
std::vector<Message> messages_;
friend Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger);
};
} // namespace Opm

View File

@ -0,0 +1,146 @@
/*
Copyright 2019 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2019 Equinor.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include "config.h"
#include <opm/simulators/gatherDeferredLogger.hpp>
#if HAVE_MPI
#include <cassert>
#include <numeric>
#include <mpi.h>
namespace
{
void packMessages(const std::vector<Opm::DeferredLogger::Message>& local_messages, std::vector<char>& buf, int& offset)
{
for (const auto lm : local_messages) {
MPI_Pack(&lm.flag, 1, MPI_INT64_T, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
int tagsize = lm.tag.size();
MPI_Pack(&tagsize, 1, MPI_UNSIGNED, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(lm.tag.c_str(), lm.tag.size(), MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
int textsize = lm.text.size();
MPI_Pack(&textsize, 1, MPI_UNSIGNED, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
MPI_Pack(lm.text.c_str(), lm.text.size(), MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
}
}
Opm::DeferredLogger::Message unpackSingleMessage(const std::vector<char>& recv_buffer, int& offset)
{
int64_t flag;
auto* data = const_cast<char*>(recv_buffer.data());
MPI_Unpack(data, recv_buffer.size(), &offset, &flag, 1, MPI_INT64_T, MPI_COMM_WORLD);
// unpack tag
unsigned int tag_length;
MPI_Unpack(data, recv_buffer.size(), &offset, &tag_length, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
std::string tag;
if (tag_length>0) {
std::vector<char> tagchars(tag_length);
MPI_Unpack(data, recv_buffer.size(), &offset, tagchars.data(), tag_length, MPI_CHAR, MPI_COMM_WORLD);
tag = std::string(tagchars.data());
}
// unpack text
unsigned int text_length;
MPI_Unpack(data, recv_buffer.size(), &offset, &text_length, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
std::string text;
if (text_length>0) {
std::vector<char> textchars(text_length);
MPI_Unpack(data, recv_buffer.size(), &offset, textchars.data(), text_length, MPI_CHAR, MPI_COMM_WORLD);
text = std::string (textchars.data());
}
return Opm::DeferredLogger::Message({flag, tag, text});
}
std::vector<Opm::DeferredLogger::Message> unpackMessages(const std::vector<char>& recv_buffer, const std::vector<int>& displ)
{
std::vector<Opm::DeferredLogger::Message> messages;
const int num_processes = displ.size() - 1;
for (int process = 0; process < num_processes; ++process) {
int offset = displ[process];
messages.push_back(unpackSingleMessage(recv_buffer, offset));
assert(offset == displ[process + 1]);
}
return messages;
}
} // anonymous namespace
namespace Opm
{
/// combine (per-process) messages
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger)
{
// Pack local messages.
int message_size = 0;
for (const auto lm : local_deferredlogger.messages_) {
message_size += sizeof(lm.flag);
message_size += sizeof(unsigned int);// to store the length of tag
message_size += lm.tag.size();
message_size += sizeof(unsigned int);// to store the length of text
message_size += lm.text.size();
}
//////int message_size = local_messages.size()
std::vector<char> buffer(message_size);
int offset = 0;
packMessages(local_deferredlogger.messages_, buffer, offset);
assert(offset == message_size);
// Get message sizes and create offset/displacement array for gathering.
int num_processes = -1;
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
std::vector<int> message_sizes(num_processes);
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
std::vector<int> displ(num_processes + 1, 0);
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
// Gather.
std::vector<char> recv_buffer(displ.back());
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
const_cast<char*>(recv_buffer.data()), message_sizes.data(),
displ.data(), MPI_PACKED,
MPI_COMM_WORLD);
// Unpack.
std::vector<Opm::DeferredLogger::Message> m = unpackMessages(recv_buffer, displ);
Opm::DeferredLogger global_deferredlogger;
global_deferredlogger.messages_ = m;
return global_deferredlogger;
}
} // namespace Opm
#else // HAVE_MPI
namespace Opm
{
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger)
{
return local_deferredlogger;
}
} // namespace Opm
#endif // HAVE_MPI

View File

@ -0,0 +1,37 @@
/*
Copyright 2019 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2019 Equinor.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef OPM_GATHERDEFERREDLOGGER_HEADER_INCLUDED
#define OPM_GATHERDEFERREDLOGGER_HEADER_INCLUDED
#include <opm/simulators/DeferredLogger.hpp>
#include <vector>
namespace Opm
{
/// Create a global log combining local logs
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger);
} // namespace Opm
#endif // OPM_GATHERDEFERREDLOGGER_HEADER_INCLUDED

View File

@ -0,0 +1,120 @@
/*
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
Copyright 2018 Equinor.
This file is part of the Open Porous Media project (OPM).
OPM is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
OPM is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with OPM. If not, see <http://www.gnu.org/licenses/>.
*/
#include <config.h>
#define BOOST_TEST_MODULE TestGatherDeferredLogger
#define BOOST_TEST_NO_MAIN
#include <boost/test/unit_test.hpp>
#include <opm/simulators/gatherDeferredLogger.hpp>
#include <dune/common/parallel/mpihelper.hh>
#include <opm/common/OpmLog/OpmLog.hpp>
#include <opm/common/OpmLog/LogBackend.hpp>
#include <opm/common/OpmLog/CounterLog.hpp>
#include <opm/common/OpmLog/TimerLog.hpp>
#include <opm/common/OpmLog/StreamLog.hpp>
#include <opm/common/OpmLog/LogUtil.hpp>
using namespace Opm;
#if HAVE_MPI
struct MPIError
{
MPIError(std::string s, int e) : errorstring(std::move(s)), errorcode(e){}
std::string errorstring;
int errorcode;
};
void MPI_err_handler(MPI_Comm*, int* err_code, ...)
{
std::vector<char> err_string(MPI_MAX_ERROR_STRING);
int err_length;
MPI_Error_string(*err_code, err_string.data(), &err_length);
std::string s(err_string.data(), err_length);
std::cerr << "An MPI Error ocurred:" << std::endl << s << std::endl;
throw MPIError(s, *err_code);
}
#endif
bool
init_unit_test_func()
{
return true;
}
void initLogger(std::ostringstream& log_stream) {
OpmLog::removeAllBackends();
std::shared_ptr<CounterLog> counter = std::make_shared<CounterLog>();
std::shared_ptr<StreamLog> streamLog = std::make_shared<StreamLog>( log_stream , Log::DefaultMessageTypes);
OpmLog::addBackend("COUNTER" , counter);
OpmLog::addBackend("STREAM" , streamLog);
BOOST_CHECK_EQUAL( true , OpmLog::hasBackend("COUNTER"));
BOOST_CHECK_EQUAL( true , OpmLog::hasBackend("STREAM"));
streamLog->setMessageFormatter(std::make_shared<SimpleMessageFormatter>(true, false));
streamLog->setMessageLimiter(std::make_shared<MessageLimiter>(2));
}
BOOST_AUTO_TEST_CASE(AllHaveFailure)
{
auto cc = Dune::MPIHelper::getCollectiveCommunication();
std::ostringstream log_stream;
initLogger(log_stream);
Opm::DeferredLogger local_deferredlogger;
local_deferredlogger.info("info from rank" + std::to_string(cc.rank()));
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
if (cc.rank()==0) {
global_deferredlogger.logMessages();
auto counter = OpmLog::getBackend<CounterLog>("COUNTER");
BOOST_CHECK_EQUAL( cc.size() , counter->numMessages(Log::MessageType::Info) );
std::string expected;
for (int i=0; i<cc.size(); i++) {
expected += Log::prefixMessage(Log::MessageType::Info, "info from rank"+std::to_string(i)) + "\n";
}
BOOST_CHECK_EQUAL(log_stream.str(), expected);
std::cerr<<""<<log_stream.str();
}
}
int main(int argc, char** argv)
{
Dune::MPIHelper::instance(argc, argv);
#if HAVE_MPI
// register a throwing error handler to allow for
// debugging with "catch throw" in gdb
MPI_Errhandler handler;
MPI_Comm_create_errhandler(MPI_err_handler, &handler);
MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);
#endif
return boost::unit_test::unit_test_main(&init_unit_test_func, argc, argv);
}