Fix broadcast and initialize int_pos properly..

This commit is contained in:
Vegard Kippe
2024-07-09 15:02:15 +02:00
parent c10695c5d5
commit 99d5a147b1
3 changed files with 43 additions and 9 deletions

View File

@@ -52,7 +52,15 @@ public:
try {
this->pack(data);
m_comm.broadcast(&m_packSize, 1, root);
m_comm.broadcast(m_buffer.data(), m_packSize, root);
const int maxChunkSize = std::numeric_limits<int>::max();
std::size_t remainingSize = m_packSize;
std::size_t pos = 0;
while (remainingSize > maxChunkSize) {
m_comm.broadcast(m_buffer.data()+pos, maxChunkSize, root);
pos += maxChunkSize;
remainingSize -= maxChunkSize;
}
m_comm.broadcast(m_buffer.data()+pos, static_cast<int>(remainingSize), root);
} catch (...) {
m_packSize = std::numeric_limits<size_t>::max();
m_comm.broadcast(&m_packSize, 1, root);
@@ -63,9 +71,16 @@ public:
if (m_packSize == std::numeric_limits<size_t>::max()) {
throw std::runtime_error("Error detected in parallel serialization");
}
m_buffer.resize(m_packSize);
m_comm.broadcast(m_buffer.data(), m_packSize, root);
const int maxChunkSize = std::numeric_limits<int>::max();
std::size_t remainingSize = m_packSize;
std::size_t pos = 0;
while (remainingSize > maxChunkSize) {
m_comm.broadcast(m_buffer.data()+pos, maxChunkSize, root);
pos += maxChunkSize;
remainingSize -= maxChunkSize;
}
m_comm.broadcast(m_buffer.data()+pos, static_cast<int>(remainingSize), root);
this->unpack(data);
}
}
@@ -80,7 +95,15 @@ public:
try {
this->pack(std::forward<Args>(args)...);
m_comm.broadcast(&m_packSize, 1, root);
m_comm.broadcast(m_buffer.data(), m_packSize, root);
const int maxChunkSize = std::numeric_limits<int>::max();
std::size_t remainingSize = m_packSize;
std::size_t pos = 0;
while (remainingSize > maxChunkSize) {
m_comm.broadcast(m_buffer.data()+pos, maxChunkSize, root);
pos += maxChunkSize;
remainingSize -= maxChunkSize;
}
m_comm.broadcast(m_buffer.data()+pos, static_cast<int>(remainingSize), root);
} catch (...) {
m_packSize = std::numeric_limits<size_t>::max();
m_comm.broadcast(&m_packSize, 1, root);
@@ -92,7 +115,15 @@ public:
throw std::runtime_error("Error detected in parallel serialization");
}
m_buffer.resize(m_packSize);
m_comm.broadcast(m_buffer.data(), m_packSize, root);
const int maxChunkSize = std::numeric_limits<int>::max();
std::size_t remainingSize = m_packSize;
std::size_t pos = 0;
while (remainingSize > maxChunkSize) {
m_comm.broadcast(m_buffer.data()+pos, maxChunkSize, root);
pos += maxChunkSize;
remainingSize -= maxChunkSize;
}
m_comm.broadcast(m_buffer.data()+pos, static_cast<int>(remainingSize), root);
this->unpack(std::forward<Args>(args)...);
}
}