Use block transfer for data transfer of cell properties

This commit is contained in:
Magne Sjaastad
2014-04-15 13:40:53 +02:00
parent eec2ffbecd
commit 19b655542a
9 changed files with 155 additions and 141 deletions

View File

@@ -42,6 +42,7 @@
#include <QTcpSocket>
#include "RiaApplication.h"
#include "RiaPreferences.h"
#include "RiaSocketDataTransfer.h"
//--------------------------------------------------------------------------------------------------
@@ -145,6 +146,9 @@ public:
socketStream << timestepByteCount ;
// Then write the data.
size_t valueCount = RiaSocketDataTransfer::doubleValueCountInBlock();
std::vector<double> values(valueCount);
size_t valueIndex = 0;
size_t globalCellCount = activeInfo->globalCellCount();
for (size_t tIdx = 0; tIdx < requestedTimesteps.size(); ++tIdx)
@@ -156,37 +160,34 @@ public:
{
if (resultIdx < scalarResultFrames->at(requestedTimesteps[tIdx]).size())
{
socketStream << scalarResultFrames->at(requestedTimesteps[tIdx])[resultIdx];
values[valueIndex] = scalarResultFrames->at(requestedTimesteps[tIdx])[resultIdx];
}
else
{
socketStream << HUGE_VAL;
values[valueIndex] = HUGE_VAL;
}
valueIndex++;
if (valueIndex >= valueCount)
{
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
return false;
}
valueIndex = 0;
}
}
}
}
#if 0
// This aproach is faster but does not handle coarsening
size_t timestepResultCount = scalarResultFrames->front().size();
quint64 timestepByteCount = (quint64)(timestepResultCount*sizeof(double));
socketStream << timestepByteCount ;
// Then write the data.
for (size_t tIdx = 0; tIdx < requestedTimesteps.size(); ++tIdx)
// Write remaining data
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
#if 1 // Write data as raw bytes, fast but does not handle byteswapping
server->currentClient()->write((const char *)scalarResultFrames->at(requestedTimesteps[tIdx]).data(), timestepByteCount); // Raw print of data. Fast but no platform conversion
#else // Write data using QDataStream, does byteswapping for us. Must use QDataStream on client as well
for (size_t cIdx = 0; cIdx < scalarResultFrames->at(requestedTimesteps[tIdx]).size(); ++cIdx)
{
socketStream << scalarResultFrames->at(tIdx)[cIdx];
}
#endif
return false;
}
#endif
}
}
return true;
}
};
@@ -314,7 +315,9 @@ public:
continue;
}
std::vector<double> values(rigGrid->cellCount());
size_t valueCount = RiaSocketDataTransfer::doubleValueCountInBlock();
std::vector<double> values(valueCount);
size_t valueIndex = 0;
for (size_t cellIdx = 0; cellIdx < rigGrid->cellCount(); cellIdx++)
{
double cellValue = cellCenterDataAccessObject->cellScalar(cellIdx);
@@ -322,10 +325,24 @@ public:
{
cellValue = 0.0;
}
values[cellIdx] = cellValue;
values[valueIndex++] = cellValue;
if (valueIndex >= valueCount)
{
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
return false;
}
valueIndex = 0;
}
}
RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), values.size() * sizeof(double));
// Write remaining data
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
return false;
}
}
return true;
@@ -552,16 +569,18 @@ public:
internalMatrixData = m_scalarResultsToAdd->at(m_requestedTimesteps[m_currentTimeStepNumberToRead]).data();
}
#if 1 // Use raw data transfer. Faster.
bytesRead = currentClient->read((char*)(internalMatrixData), m_bytesPerTimeStepToRead);
#else
for (size_t cIdx = 0; cIdx < cellCountFromOctave; ++cIdx)
QStringList errorMessages;
if (!RiaSocketDataTransfer::readBlockDataFromSocket(currentClient, (char*)(internalMatrixData), m_bytesPerTimeStepToRead, errorMessages))
{
socketStream >> internalMatrixData[cIdx];
for (int i = 0; i < errorMessages.size(); i++)
{
server->errorMessageDialog()->showMessage(errorMessages[i]);
}
if (socketStream.status() == QDataStream::Ok) bytesRead += sizeof(double);
currentClient->abort();
return true;
}
#endif
// Map data from active to result index based container ( Coarsening is active)
if (isCoarseningActive)
{
@@ -576,12 +595,6 @@ public:
}
}
if ((int)m_bytesPerTimeStepToRead != bytesRead)
{
server->errorMessageDialog()->showMessage(RiaSocketServer::tr("ResInsight SocketServer: \n") +
RiaSocketServer::tr("Could not read binary double data properly from socket"));
}
++m_currentTimeStepNumberToRead;
}
@@ -896,8 +909,17 @@ public:
std::vector<double> doubleValues(cellCountFromOctave);
qint64 bytesRead = currentClient->read((char*)(doubleValues.data()), m_bytesPerTimeStepToRead);
size_t doubleValueIndex = 0;
QStringList errorMessages;
if (!RiaSocketDataTransfer::readBlockDataFromSocket(currentClient, (char*)(doubleValues.data()), m_bytesPerTimeStepToRead, errorMessages))
{
for (int i = 0; i < errorMessages.size(); i++)
{
server->errorMessageDialog()->showMessage(errorMessages[i]);
}
currentClient->abort();
return true;
}
cvf::ref<cvf::StructGridScalarDataAccess> cellCenterDataAccessObject =
m_currentReservoir->reservoirData()->dataAccessObject(grid, m_porosityModelEnum, m_requestedTimesteps[m_currentTimeStepNumberToRead], m_currentScalarIndex);

View File

@@ -18,6 +18,7 @@
#include "RiaSocketDataTransfer.h"
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
@@ -55,15 +56,12 @@ bool RiaSocketDataTransfer::writeBlockDataToSocket(QTcpSocket* socket, const cha
bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* data, quint64 bytesToRead, QStringList& errorMessages)
{
quint64 bytesRead = 0;
int blockCount = 0;
quint64 maxBlockSize = doubleValueCountInBlock() * sizeof(double);
while (bytesRead < bytesToRead)
{
if (socket->bytesAvailable())
{
quint64 byteCountToRead = qMin(bytesToRead - bytesRead, maxBlockSize);
quint64 byteCountToRead = bytesToRead - bytesRead;
qint64 actuallyBytesRead = socket->read(data + bytesRead, byteCountToRead);
if (actuallyBytesRead < 0)
@@ -75,7 +73,6 @@ bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* da
}
bytesRead += actuallyBytesRead;
blockCount++;
}
else
{
@@ -87,6 +84,12 @@ bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* da
return false;
}
}
// Allow Octave process to end a long running Octave function
#ifdef octave_oct_h
OCTAVE_QUIT;
#endif
}
return true;

View File

@@ -122,10 +122,10 @@ bool RiaSocketTools::writeBlockData(RiaSocketServer* server, QTcpSocket* socket,
server->errorMessageDialog()->showMessage(errorMessages[i]);
}
double totalTimeMS = timer.time() * 1000.0;
QString resultInfo = QString("Total time '%1 ms'").arg(totalTimeMS);
server->errorMessageDialog()->showMessage(resultInfo);
// double totalTimeMS = timer.time() * 1000.0;
// QString resultInfo = QString("Total time '%1 ms'").arg(totalTimeMS);
//
// server->errorMessageDialog()->showMessage(resultInfo);
}
return writeSucceded;