mirror of
https://github.com/OPM/opm-simulators.git
synced 2025-02-25 18:55:30 -06:00
Merge pull request #1705 from fgfuchs/feature_gatherdeferredlogger
gather deferredLogger implemented
This commit is contained in:
commit
8927df3095
@ -116,6 +116,18 @@ opm_add_test(test_gatherconvergencereport
|
||||
5 ${CMAKE_BINARY_DIR}
|
||||
)
|
||||
|
||||
opm_add_test(test_gatherdeferredlogger
|
||||
DEPENDS "opmsimulators"
|
||||
LIBRARIES opmsimulators ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY}
|
||||
SOURCES
|
||||
tests/test_gatherdeferredlogger.cpp
|
||||
CONDITION
|
||||
MPI_FOUND
|
||||
DRIVER_ARGS
|
||||
5 ${CMAKE_BINARY_DIR}
|
||||
)
|
||||
|
||||
|
||||
opm_add_test(flow
|
||||
ONLY_COMPILE
|
||||
ALWAYS_ENABLE
|
||||
|
@ -45,6 +45,7 @@ list (APPEND MAIN_SOURCE_FILES
|
||||
opm/simulators/timestepping/AdaptiveSimulatorTimer.cpp
|
||||
opm/simulators/timestepping/SimulatorTimer.cpp
|
||||
opm/simulators/timestepping/gatherConvergenceReport.cpp
|
||||
opm/simulators/gatherDeferredLogger.cpp
|
||||
)
|
||||
|
||||
# originally generated with the command:
|
||||
@ -174,4 +175,5 @@ list (APPEND PUBLIC_HEADER_FILES
|
||||
opm/simulators/timestepping/SimulatorTimer.hpp
|
||||
opm/simulators/timestepping/SimulatorTimerInterface.hpp
|
||||
opm/simulators/timestepping/gatherConvergenceReport.hpp
|
||||
opm/simulators/gatherDeferredLogger.hpp
|
||||
)
|
||||
|
@ -36,6 +36,14 @@ namespace Opm
|
||||
class DeferredLogger
|
||||
{
|
||||
public:
|
||||
|
||||
struct Message
|
||||
{
|
||||
int64_t flag;
|
||||
std::string tag;
|
||||
std::string text;
|
||||
};
|
||||
|
||||
void info(const std::string& tag, const std::string& message);
|
||||
void warning(const std::string& tag, const std::string& message);
|
||||
void error(const std::string& tag, const std::string& message);
|
||||
@ -55,13 +63,8 @@ namespace Opm
|
||||
void logMessages();
|
||||
|
||||
private:
|
||||
struct Message
|
||||
{
|
||||
int64_t flag;
|
||||
std::string tag;
|
||||
std::string text;
|
||||
};
|
||||
std::vector<Message> messages_;
|
||||
friend Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger);
|
||||
};
|
||||
|
||||
} // namespace Opm
|
||||
|
172
opm/simulators/gatherDeferredLogger.cpp
Normal file
172
opm/simulators/gatherDeferredLogger.cpp
Normal file
@ -0,0 +1,172 @@
|
||||
/*
|
||||
Copyright 2019 SINTEF Digital, Mathematics and Cybernetics.
|
||||
Copyright 2019 Equinor.
|
||||
|
||||
This file is part of the Open Porous Media project (OPM).
|
||||
|
||||
OPM is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OPM is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OPM. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include <opm/simulators/gatherDeferredLogger.hpp>
|
||||
|
||||
#if HAVE_MPI
|
||||
|
||||
#include <cassert>
|
||||
#include <numeric>
|
||||
#include <mpi.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
void packMessages(const std::vector<Opm::DeferredLogger::Message>& local_messages, std::vector<char>& buf, int& offset)
|
||||
{
|
||||
|
||||
int messagesize = local_messages.size();
|
||||
MPI_Pack(&messagesize, 1, MPI_UNSIGNED, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
|
||||
for (const auto lm : local_messages) {
|
||||
MPI_Pack(&lm.flag, 1, MPI_INT64_T, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
int tagsize = lm.tag.size();
|
||||
MPI_Pack(&tagsize, 1, MPI_UNSIGNED, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
if (tagsize>0) {
|
||||
MPI_Pack(lm.tag.c_str(), lm.tag.size(), MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
}
|
||||
int textsize = lm.text.size();
|
||||
MPI_Pack(&textsize, 1, MPI_UNSIGNED, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
if (textsize>0) {
|
||||
MPI_Pack(lm.text.c_str(), lm.text.size(), MPI_CHAR, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Opm::DeferredLogger::Message unpackSingleMessage(const std::vector<char>& recv_buffer, int& offset)
|
||||
{
|
||||
int64_t flag;
|
||||
auto* data = const_cast<char*>(recv_buffer.data());
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, &flag, 1, MPI_INT64_T, MPI_COMM_WORLD);
|
||||
|
||||
// unpack tag
|
||||
unsigned int tagsize;
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, &tagsize, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
|
||||
std::string tag;
|
||||
if (tagsize>0) {
|
||||
std::vector<char> tagchars(tagsize);
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, tagchars.data(), tagsize, MPI_CHAR, MPI_COMM_WORLD);
|
||||
tag = std::string(tagchars.data(), tagsize);
|
||||
}
|
||||
// unpack text
|
||||
unsigned int textsize;
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, &textsize, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
|
||||
std::string text;
|
||||
if (textsize>0) {
|
||||
std::vector<char> textchars(textsize);
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, textchars.data(), textsize, MPI_CHAR, MPI_COMM_WORLD);
|
||||
text = std::string (textchars.data(), textsize);
|
||||
}
|
||||
return Opm::DeferredLogger::Message({flag, tag, text});
|
||||
}
|
||||
|
||||
std::vector<Opm::DeferredLogger::Message> unpackMessages(const std::vector<char>& recv_buffer, const std::vector<int>& displ)
|
||||
{
|
||||
std::vector<Opm::DeferredLogger::Message> messages;
|
||||
const int num_processes = displ.size() - 1;
|
||||
auto* data = const_cast<char*>(recv_buffer.data());
|
||||
for (int process = 0; process < num_processes; ++process) {
|
||||
int offset = displ[process];
|
||||
// unpack number of messages
|
||||
unsigned int messagesize;
|
||||
MPI_Unpack(data, recv_buffer.size(), &offset, &messagesize, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
|
||||
for (unsigned int i=0; i<messagesize; i++) {
|
||||
messages.push_back(unpackSingleMessage(recv_buffer, offset));
|
||||
}
|
||||
assert(offset == displ[process + 1]);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
|
||||
namespace Opm
|
||||
{
|
||||
|
||||
/// combine (per-process) messages
|
||||
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger)
|
||||
{
|
||||
|
||||
int num_messages = local_deferredlogger.messages_.size();
|
||||
|
||||
int int64_mpi_pack_size;
|
||||
MPI_Pack_size(1, MPI_INT64_T, MPI_COMM_WORLD, &int64_mpi_pack_size);
|
||||
int unsigned_int_mpi_pack_size;
|
||||
MPI_Pack_size(1, MPI_UNSIGNED, MPI_COMM_WORLD, &unsigned_int_mpi_pack_size);
|
||||
|
||||
// store number of messages;
|
||||
int message_size = unsigned_int_mpi_pack_size;
|
||||
// store 1 int64 per message for flag
|
||||
message_size += num_messages*int64_mpi_pack_size;
|
||||
// store 2 unsigned ints per message for length of tag and length of text
|
||||
message_size += num_messages*2*unsigned_int_mpi_pack_size;
|
||||
|
||||
for (const auto lm : local_deferredlogger.messages_) {
|
||||
int string_mpi_pack_size;
|
||||
MPI_Pack_size(lm.tag.size(), MPI_CHAR, MPI_COMM_WORLD, &string_mpi_pack_size);
|
||||
message_size += string_mpi_pack_size;
|
||||
MPI_Pack_size(lm.text.size(), MPI_CHAR, MPI_COMM_WORLD, &string_mpi_pack_size);
|
||||
message_size += string_mpi_pack_size;
|
||||
}
|
||||
|
||||
// Pack local messages.
|
||||
std::vector<char> buffer(message_size);
|
||||
|
||||
int offset = 0;
|
||||
packMessages(local_deferredlogger.messages_, buffer, offset);
|
||||
assert(offset == message_size);
|
||||
|
||||
// Get message sizes and create offset/displacement array for gathering.
|
||||
int num_processes = -1;
|
||||
MPI_Comm_size(MPI_COMM_WORLD, &num_processes);
|
||||
std::vector<int> message_sizes(num_processes);
|
||||
MPI_Allgather(&message_size, 1, MPI_INT, message_sizes.data(), 1, MPI_INT, MPI_COMM_WORLD);
|
||||
std::vector<int> displ(num_processes + 1, 0);
|
||||
std::partial_sum(message_sizes.begin(), message_sizes.end(), displ.begin() + 1);
|
||||
|
||||
// Gather.
|
||||
std::vector<char> recv_buffer(displ.back());
|
||||
MPI_Allgatherv(buffer.data(), buffer.size(), MPI_PACKED,
|
||||
const_cast<char*>(recv_buffer.data()), message_sizes.data(),
|
||||
displ.data(), MPI_PACKED,
|
||||
MPI_COMM_WORLD);
|
||||
|
||||
// Unpack.
|
||||
Opm::DeferredLogger global_deferredlogger;
|
||||
global_deferredlogger.messages_ = unpackMessages(recv_buffer, displ);
|
||||
return global_deferredlogger;
|
||||
}
|
||||
|
||||
} // namespace Opm
|
||||
|
||||
#else // HAVE_MPI
|
||||
|
||||
namespace Opm
|
||||
{
|
||||
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger)
|
||||
{
|
||||
return local_deferredlogger;
|
||||
}
|
||||
} // namespace Opm
|
||||
|
||||
#endif // HAVE_MPI
|
35
opm/simulators/gatherDeferredLogger.hpp
Normal file
35
opm/simulators/gatherDeferredLogger.hpp
Normal file
@ -0,0 +1,35 @@
|
||||
/*
|
||||
Copyright 2019 SINTEF Digital, Mathematics and Cybernetics.
|
||||
Copyright 2019 Equinor.
|
||||
|
||||
This file is part of the Open Porous Media project (OPM).
|
||||
|
||||
OPM is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OPM is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OPM. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef OPM_GATHERDEFERREDLOGGER_HEADER_INCLUDED
|
||||
#define OPM_GATHERDEFERREDLOGGER_HEADER_INCLUDED
|
||||
|
||||
#include <opm/simulators/DeferredLogger.hpp>
|
||||
|
||||
namespace Opm
|
||||
{
|
||||
|
||||
/// Create a global log combining local logs
|
||||
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger);
|
||||
|
||||
} // namespace Opm
|
||||
|
||||
|
||||
#endif // OPM_GATHERDEFERREDLOGGER_HEADER_INCLUDED
|
180
tests/test_gatherdeferredlogger.cpp
Normal file
180
tests/test_gatherdeferredlogger.cpp
Normal file
@ -0,0 +1,180 @@
|
||||
/*
|
||||
Copyright 2018 SINTEF Digital, Mathematics and Cybernetics.
|
||||
Copyright 2018 Equinor.
|
||||
|
||||
This file is part of the Open Porous Media project (OPM).
|
||||
|
||||
OPM is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
OPM is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with OPM. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <config.h>
|
||||
|
||||
#define BOOST_TEST_MODULE TestGatherDeferredLogger
|
||||
#define BOOST_TEST_NO_MAIN
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
|
||||
#include <opm/simulators/gatherDeferredLogger.hpp>
|
||||
#include <dune/common/parallel/mpihelper.hh>
|
||||
|
||||
#include <opm/common/OpmLog/OpmLog.hpp>
|
||||
#include <opm/common/OpmLog/LogBackend.hpp>
|
||||
#include <opm/common/OpmLog/CounterLog.hpp>
|
||||
#include <opm/common/OpmLog/TimerLog.hpp>
|
||||
#include <opm/common/OpmLog/StreamLog.hpp>
|
||||
#include <opm/common/OpmLog/LogUtil.hpp>
|
||||
|
||||
using namespace Opm;
|
||||
|
||||
#if HAVE_MPI
|
||||
struct MPIError
|
||||
{
|
||||
MPIError(std::string s, int e) : errorstring(std::move(s)), errorcode(e){}
|
||||
std::string errorstring;
|
||||
int errorcode;
|
||||
};
|
||||
|
||||
void MPI_err_handler(MPI_Comm*, int* err_code, ...)
|
||||
{
|
||||
std::vector<char> err_string(MPI_MAX_ERROR_STRING);
|
||||
int err_length;
|
||||
MPI_Error_string(*err_code, err_string.data(), &err_length);
|
||||
std::string s(err_string.data(), err_length);
|
||||
std::cerr << "An MPI Error ocurred:" << std::endl << s << std::endl;
|
||||
throw MPIError(s, *err_code);
|
||||
}
|
||||
#endif
|
||||
|
||||
bool
|
||||
init_unit_test_func()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
void initLogger(std::ostringstream& log_stream) {
|
||||
OpmLog::removeAllBackends();
|
||||
std::shared_ptr<CounterLog> counter = std::make_shared<CounterLog>();
|
||||
std::shared_ptr<StreamLog> streamLog = std::make_shared<StreamLog>( log_stream , Log::DefaultMessageTypes);
|
||||
|
||||
OpmLog::addBackend("COUNTER" , counter);
|
||||
OpmLog::addBackend("STREAM" , streamLog);
|
||||
BOOST_CHECK_EQUAL( true , OpmLog::hasBackend("COUNTER"));
|
||||
BOOST_CHECK_EQUAL( true , OpmLog::hasBackend("STREAM"));
|
||||
|
||||
streamLog->setMessageFormatter(std::make_shared<SimpleMessageFormatter>(true, false));
|
||||
streamLog->setMessageLimiter(std::make_shared<MessageLimiter>(2));
|
||||
}
|
||||
|
||||
|
||||
|
||||
BOOST_AUTO_TEST_CASE(NoMessages)
|
||||
{
|
||||
auto cc = Dune::MPIHelper::getCollectiveCommunication();
|
||||
|
||||
std::ostringstream log_stream;
|
||||
initLogger(log_stream);
|
||||
|
||||
Opm::DeferredLogger local_deferredlogger;
|
||||
|
||||
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
|
||||
|
||||
if (cc.rank() == 0) {
|
||||
|
||||
global_deferredlogger.logMessages();
|
||||
|
||||
auto counter = OpmLog::getBackend<CounterLog>("COUNTER");
|
||||
BOOST_CHECK_EQUAL( 0 , counter->numMessages(Log::MessageType::Info) );
|
||||
|
||||
std::string expected;
|
||||
BOOST_CHECK_EQUAL(log_stream.str(), expected);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(VariableNumberOfMessages)
|
||||
{
|
||||
auto cc = Dune::MPIHelper::getCollectiveCommunication();
|
||||
|
||||
std::ostringstream log_stream;
|
||||
initLogger(log_stream);
|
||||
|
||||
Opm::DeferredLogger local_deferredlogger;
|
||||
if (cc.rank() == 1) {
|
||||
local_deferredlogger.info("info from rank " + std::to_string(cc.rank()));
|
||||
local_deferredlogger.warning("warning from rank " + std::to_string(cc.rank()));
|
||||
} else if (cc.rank() == 2) {
|
||||
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
|
||||
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
|
||||
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
|
||||
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
|
||||
}
|
||||
|
||||
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
|
||||
|
||||
if (cc.rank() == 0) {
|
||||
|
||||
global_deferredlogger.logMessages();
|
||||
|
||||
auto counter = OpmLog::getBackend<CounterLog>("COUNTER");
|
||||
BOOST_CHECK_EQUAL( 1 , counter->numMessages(Log::MessageType::Info) );
|
||||
BOOST_CHECK_EQUAL( 1 , counter->numMessages(Log::MessageType::Warning) );
|
||||
BOOST_CHECK_EQUAL( 4 , counter->numMessages(Log::MessageType::Bug) );
|
||||
|
||||
const std::string expected = Log::prefixMessage(Log::MessageType::Info, "info from rank 1") + "\n"
|
||||
+ Log::prefixMessage(Log::MessageType::Warning, "warning from rank 1") + "\n"
|
||||
+ Log::prefixMessage(Log::MessageType::Bug, "bug from rank 2") + "\n"
|
||||
+ Log::prefixMessage(Log::MessageType::Bug, "bug from rank 2") + "\n"
|
||||
+ Log::prefixMessage(Log::MessageType::Bug, "Message limit reached for message tag: tagme") + "\n";
|
||||
BOOST_CHECK_EQUAL(log_stream.str(), expected);
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(AllHaveOneMessage)
|
||||
{
|
||||
auto cc = Dune::MPIHelper::getCollectiveCommunication();
|
||||
|
||||
std::ostringstream log_stream;
|
||||
initLogger(log_stream);
|
||||
|
||||
Opm::DeferredLogger local_deferredlogger;
|
||||
local_deferredlogger.info("info from rank " + std::to_string(cc.rank()));
|
||||
|
||||
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
|
||||
|
||||
if (cc.rank() == 0) {
|
||||
|
||||
global_deferredlogger.logMessages();
|
||||
|
||||
auto counter = OpmLog::getBackend<CounterLog>("COUNTER");
|
||||
BOOST_CHECK_EQUAL( cc.size() , counter->numMessages(Log::MessageType::Info) );
|
||||
|
||||
std::string expected;
|
||||
for (int i=0; i<cc.size(); i++) {
|
||||
expected += Log::prefixMessage(Log::MessageType::Info, "info from rank "+std::to_string(i)) + "\n";
|
||||
}
|
||||
BOOST_CHECK_EQUAL(log_stream.str(), expected);
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
Dune::MPIHelper::instance(argc, argv);
|
||||
#if HAVE_MPI
|
||||
// register a throwing error handler to allow for
|
||||
// debugging with "catch throw" in gdb
|
||||
MPI_Errhandler handler;
|
||||
MPI_Comm_create_errhandler(MPI_err_handler, &handler);
|
||||
MPI_Comm_set_errhandler(MPI_COMM_WORLD, handler);
|
||||
#endif
|
||||
return boost::unit_test::unit_test_main(&init_unit_test_func, argc, argv);
|
||||
}
|
Loading…
Reference in New Issue
Block a user