fix for variable number of messages

This commit is contained in:
Franz G. Fuchs 2019-01-11 19:05:11 +01:00
parent 37d691e678
commit 828c3c7949
2 changed files with 77 additions and 5 deletions

View File

@ -34,6 +34,9 @@ namespace
void packMessages(const std::vector<Opm::DeferredLogger::Message>& local_messages, std::vector<char>& buf, int& offset) void packMessages(const std::vector<Opm::DeferredLogger::Message>& local_messages, std::vector<char>& buf, int& offset)
{ {
int messagesize = local_messages.size();
MPI_Pack(&messagesize, 1, MPI_UNSIGNED, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
for (const auto lm : local_messages) { for (const auto lm : local_messages) {
MPI_Pack(&lm.flag, 1, MPI_INT64_T, buf.data(), buf.size(), &offset, MPI_COMM_WORLD); MPI_Pack(&lm.flag, 1, MPI_INT64_T, buf.data(), buf.size(), &offset, MPI_COMM_WORLD);
int tagsize = lm.tag.size(); int tagsize = lm.tag.size();
@ -80,9 +83,15 @@ namespace
{ {
std::vector<Opm::DeferredLogger::Message> messages; std::vector<Opm::DeferredLogger::Message> messages;
const int num_processes = displ.size() - 1; const int num_processes = displ.size() - 1;
auto* data = const_cast<char*>(recv_buffer.data());
for (int process = 0; process < num_processes; ++process) { for (int process = 0; process < num_processes; ++process) {
int offset = displ[process]; int offset = displ[process];
messages.push_back(unpackSingleMessage(recv_buffer, offset)); // unpack number of messages
unsigned int messagesize;
MPI_Unpack(data, recv_buffer.size(), &offset, &messagesize, 1, MPI_UNSIGNED, MPI_COMM_WORLD);
for (unsigned int i=0; i<messagesize; i++) {
messages.push_back(unpackSingleMessage(recv_buffer, offset));
}
assert(offset == displ[process + 1]); assert(offset == displ[process + 1]);
} }
return messages; return messages;
@ -98,7 +107,7 @@ namespace Opm
Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger) Opm::DeferredLogger gatherDeferredLogger(const Opm::DeferredLogger& local_deferredlogger)
{ {
// Pack local messages. // Pack local messages.
int message_size = 0; int message_size = sizeof(unsigned int);// to store the number of messages
for (const auto lm : local_deferredlogger.messages_) { for (const auto lm : local_deferredlogger.messages_) {
message_size += sizeof(lm.flag); message_size += sizeof(lm.flag);
message_size += sizeof(unsigned int);// to store the length of tag message_size += sizeof(unsigned int);// to store the length of tag

View File

@ -78,7 +78,7 @@ void initLogger(std::ostringstream& log_stream) {
BOOST_AUTO_TEST_CASE(AllHaveFailure) BOOST_AUTO_TEST_CASE(NoMessages)
{ {
auto cc = Dune::MPIHelper::getCollectiveCommunication(); auto cc = Dune::MPIHelper::getCollectiveCommunication();
@ -86,7 +86,70 @@ BOOST_AUTO_TEST_CASE(AllHaveFailure)
initLogger(log_stream); initLogger(log_stream);
Opm::DeferredLogger local_deferredlogger; Opm::DeferredLogger local_deferredlogger;
local_deferredlogger.info("info from rank" + std::to_string(cc.rank()));
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
if (cc.rank()==0) {
global_deferredlogger.logMessages();
auto counter = OpmLog::getBackend<CounterLog>("COUNTER");
BOOST_CHECK_EQUAL( 0 , counter->numMessages(Log::MessageType::Info) );
std::string expected;
BOOST_CHECK_EQUAL(log_stream.str(), expected);
std::cerr<<""<<log_stream.str();
}
}
BOOST_AUTO_TEST_CASE(VariableNumberOfMessages)
{
auto cc = Dune::MPIHelper::getCollectiveCommunication();
std::ostringstream log_stream;
initLogger(log_stream);
Opm::DeferredLogger local_deferredlogger;
if (cc.rank()==1) {
local_deferredlogger.info("info from rank " + std::to_string(cc.rank()));
local_deferredlogger.warning("warning from rank " + std::to_string(cc.rank()));
} else if (cc.rank()==2) {
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
local_deferredlogger.bug("tagme", "bug from rank " + std::to_string(cc.rank()));
}
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
if (cc.rank()==0) {
global_deferredlogger.logMessages();
auto counter = OpmLog::getBackend<CounterLog>("COUNTER");
BOOST_CHECK_EQUAL( 1 , counter->numMessages(Log::MessageType::Info) );
BOOST_CHECK_EQUAL( 1 , counter->numMessages(Log::MessageType::Warning) );
BOOST_CHECK_EQUAL( 4 , counter->numMessages(Log::MessageType::Bug) );
const std::string expected = Log::prefixMessage(Log::MessageType::Info, "info from rank 1") + "\n"
+ Log::prefixMessage(Log::MessageType::Warning, "warning from rank 1") + "\n"
+ Log::prefixMessage(Log::MessageType::Bug, "bug from rank 2") + "\n"
+ Log::prefixMessage(Log::MessageType::Bug, "bug from rank 2") + "\n"
+ Log::prefixMessage(Log::MessageType::Bug, "Message limit reached for message tag: tagme") + "\n";
BOOST_CHECK_EQUAL(log_stream.str(), expected);
std::cerr<<""<<log_stream.str();
}
}
BOOST_AUTO_TEST_CASE(AllHaveOneMessage)
{
auto cc = Dune::MPIHelper::getCollectiveCommunication();
std::ostringstream log_stream;
initLogger(log_stream);
Opm::DeferredLogger local_deferredlogger;
local_deferredlogger.info("info from rank " + std::to_string(cc.rank()));
Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger); Opm::DeferredLogger global_deferredlogger = gatherDeferredLogger(local_deferredlogger);
@ -99,7 +162,7 @@ BOOST_AUTO_TEST_CASE(AllHaveFailure)
std::string expected; std::string expected;
for (int i=0; i<cc.size(); i++) { for (int i=0; i<cc.size(); i++) {
expected += Log::prefixMessage(Log::MessageType::Info, "info from rank"+std::to_string(i)) + "\n"; expected += Log::prefixMessage(Log::MessageType::Info, "info from rank "+std::to_string(i)) + "\n";
} }
BOOST_CHECK_EQUAL(log_stream.str(), expected); BOOST_CHECK_EQUAL(log_stream.str(), expected);
std::cerr<<""<<log_stream.str(); std::cerr<<""<<log_stream.str();