Use block transfer for data transfer of cell properties

This commit is contained in:
Magne Sjaastad 2014-04-15 13:40:53 +02:00
parent 8ea418272d
commit b4821ed7a3
9 changed files with 155 additions and 141 deletions

View File

@ -42,6 +42,7 @@
#include <QTcpSocket> #include <QTcpSocket>
#include "RiaApplication.h" #include "RiaApplication.h"
#include "RiaPreferences.h" #include "RiaPreferences.h"
#include "RiaSocketDataTransfer.h"
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
@ -145,6 +146,9 @@ public:
socketStream << timestepByteCount ; socketStream << timestepByteCount ;
// Then write the data. // Then write the data.
size_t valueCount = RiaSocketDataTransfer::doubleValueCountInBlock();
std::vector<double> values(valueCount);
size_t valueIndex = 0;
size_t globalCellCount = activeInfo->globalCellCount(); size_t globalCellCount = activeInfo->globalCellCount();
for (size_t tIdx = 0; tIdx < requestedTimesteps.size(); ++tIdx) for (size_t tIdx = 0; tIdx < requestedTimesteps.size(); ++tIdx)
@ -156,37 +160,34 @@ public:
{ {
if (resultIdx < scalarResultFrames->at(requestedTimesteps[tIdx]).size()) if (resultIdx < scalarResultFrames->at(requestedTimesteps[tIdx]).size())
{ {
socketStream << scalarResultFrames->at(requestedTimesteps[tIdx])[resultIdx]; values[valueIndex] = scalarResultFrames->at(requestedTimesteps[tIdx])[resultIdx];
} }
else else
{ {
socketStream << HUGE_VAL; values[valueIndex] = HUGE_VAL;
} }
}
}
}
#if 0
// This aproach is faster but does not handle coarsening
size_t timestepResultCount = scalarResultFrames->front().size();
quint64 timestepByteCount = (quint64)(timestepResultCount*sizeof(double));
socketStream << timestepByteCount ;
// Then write the data. valueIndex++;
if (valueIndex >= valueCount)
for (size_t tIdx = 0; tIdx < requestedTimesteps.size(); ++tIdx)
{ {
#if 1 // Write data as raw bytes, fast but does not handle byteswapping if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
server->currentClient()->write((const char *)scalarResultFrames->at(requestedTimesteps[tIdx]).data(), timestepByteCount); // Raw print of data. Fast but no platform conversion
#else // Write data using QDataStream, does byteswapping for us. Must use QDataStream on client as well
for (size_t cIdx = 0; cIdx < scalarResultFrames->at(requestedTimesteps[tIdx]).size(); ++cIdx)
{ {
socketStream << scalarResultFrames->at(tIdx)[cIdx]; return false;
}
#endif
}
#endif
} }
valueIndex = 0;
}
}
}
}
// Write remaining data
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
return false;
}
}
return true; return true;
} }
}; };
@ -314,7 +315,9 @@ public:
continue; continue;
} }
std::vector<double> values(rigGrid->cellCount()); size_t valueCount = RiaSocketDataTransfer::doubleValueCountInBlock();
std::vector<double> values(valueCount);
size_t valueIndex = 0;
for (size_t cellIdx = 0; cellIdx < rigGrid->cellCount(); cellIdx++) for (size_t cellIdx = 0; cellIdx < rigGrid->cellCount(); cellIdx++)
{ {
double cellValue = cellCenterDataAccessObject->cellScalar(cellIdx); double cellValue = cellCenterDataAccessObject->cellScalar(cellIdx);
@ -322,10 +325,24 @@ public:
{ {
cellValue = 0.0; cellValue = 0.0;
} }
values[cellIdx] = cellValue; values[valueIndex++] = cellValue;
if (valueIndex >= valueCount)
{
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
return false;
} }
RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), values.size() * sizeof(double)); valueIndex = 0;
}
}
// Write remaining data
if (!RiaSocketTools::writeBlockData(server, server->currentClient(), (const char *)values.data(), valueIndex * sizeof(double)))
{
return false;
}
} }
return true; return true;
@ -552,16 +569,18 @@ public:
internalMatrixData = m_scalarResultsToAdd->at(m_requestedTimesteps[m_currentTimeStepNumberToRead]).data(); internalMatrixData = m_scalarResultsToAdd->at(m_requestedTimesteps[m_currentTimeStepNumberToRead]).data();
} }
#if 1 // Use raw data transfer. Faster. QStringList errorMessages;
bytesRead = currentClient->read((char*)(internalMatrixData), m_bytesPerTimeStepToRead); if (!RiaSocketDataTransfer::readBlockDataFromSocket(currentClient, (char*)(internalMatrixData), m_bytesPerTimeStepToRead, errorMessages))
#else
for (size_t cIdx = 0; cIdx < cellCountFromOctave; ++cIdx)
{ {
socketStream >> internalMatrixData[cIdx]; for (int i = 0; i < errorMessages.size(); i++)
{
if (socketStream.status() == QDataStream::Ok) bytesRead += sizeof(double); server->errorMessageDialog()->showMessage(errorMessages[i]);
} }
#endif
currentClient->abort();
return true;
}
// Map data from active to result index based container ( Coarsening is active) // Map data from active to result index based container ( Coarsening is active)
if (isCoarseningActive) if (isCoarseningActive)
{ {
@ -576,12 +595,6 @@ public:
} }
} }
if ((int)m_bytesPerTimeStepToRead != bytesRead)
{
server->errorMessageDialog()->showMessage(RiaSocketServer::tr("ResInsight SocketServer: \n") +
RiaSocketServer::tr("Could not read binary double data properly from socket"));
}
++m_currentTimeStepNumberToRead; ++m_currentTimeStepNumberToRead;
} }
@ -896,8 +909,17 @@ public:
std::vector<double> doubleValues(cellCountFromOctave); std::vector<double> doubleValues(cellCountFromOctave);
qint64 bytesRead = currentClient->read((char*)(doubleValues.data()), m_bytesPerTimeStepToRead); QStringList errorMessages;
size_t doubleValueIndex = 0; if (!RiaSocketDataTransfer::readBlockDataFromSocket(currentClient, (char*)(doubleValues.data()), m_bytesPerTimeStepToRead, errorMessages))
{
for (int i = 0; i < errorMessages.size(); i++)
{
server->errorMessageDialog()->showMessage(errorMessages[i]);
}
currentClient->abort();
return true;
}
cvf::ref<cvf::StructGridScalarDataAccess> cellCenterDataAccessObject = cvf::ref<cvf::StructGridScalarDataAccess> cellCenterDataAccessObject =
m_currentReservoir->reservoirData()->dataAccessObject(grid, m_porosityModelEnum, m_requestedTimesteps[m_currentTimeStepNumberToRead], m_currentScalarIndex); m_currentReservoir->reservoirData()->dataAccessObject(grid, m_porosityModelEnum, m_requestedTimesteps[m_currentTimeStepNumberToRead], m_currentScalarIndex);

View File

@ -18,6 +18,7 @@
#include "RiaSocketDataTransfer.h" #include "RiaSocketDataTransfer.h"
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
/// ///
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
@ -55,15 +56,12 @@ bool RiaSocketDataTransfer::writeBlockDataToSocket(QTcpSocket* socket, const cha
bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* data, quint64 bytesToRead, QStringList& errorMessages) bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* data, quint64 bytesToRead, QStringList& errorMessages)
{ {
quint64 bytesRead = 0; quint64 bytesRead = 0;
int blockCount = 0;
quint64 maxBlockSize = doubleValueCountInBlock() * sizeof(double);
while (bytesRead < bytesToRead) while (bytesRead < bytesToRead)
{ {
if (socket->bytesAvailable()) if (socket->bytesAvailable())
{ {
quint64 byteCountToRead = qMin(bytesToRead - bytesRead, maxBlockSize); quint64 byteCountToRead = bytesToRead - bytesRead;
qint64 actuallyBytesRead = socket->read(data + bytesRead, byteCountToRead); qint64 actuallyBytesRead = socket->read(data + bytesRead, byteCountToRead);
if (actuallyBytesRead < 0) if (actuallyBytesRead < 0)
@ -75,7 +73,6 @@ bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* da
} }
bytesRead += actuallyBytesRead; bytesRead += actuallyBytesRead;
blockCount++;
} }
else else
{ {
@ -87,6 +84,12 @@ bool RiaSocketDataTransfer::readBlockDataFromSocket(QTcpSocket* socket, char* da
return false; return false;
} }
} }
// Allow Octave process to end a long running Octave function
#ifdef octave_oct_h
OCTAVE_QUIT;
#endif
} }
return true; return true;

View File

@ -122,10 +122,10 @@ bool RiaSocketTools::writeBlockData(RiaSocketServer* server, QTcpSocket* socket,
server->errorMessageDialog()->showMessage(errorMessages[i]); server->errorMessageDialog()->showMessage(errorMessages[i]);
} }
double totalTimeMS = timer.time() * 1000.0; // double totalTimeMS = timer.time() * 1000.0;
QString resultInfo = QString("Total time '%1 ms'").arg(totalTimeMS); // QString resultInfo = QString("Total time '%1 ms'").arg(totalTimeMS);
//
server->errorMessageDialog()->showMessage(resultInfo); // server->errorMessageDialog()->showMessage(resultInfo);
} }
return writeSucceded; return writeSucceded;

View File

@ -6,26 +6,26 @@
set(CPP_SOURCES set(CPP_SOURCES
riGetActiveCellProperty.cpp riGetActiveCellProperty.cpp
riSetActiveCellProperty.cpp riSetActiveCellProperty.cpp
riGetActiveCellInfo.cpp # riGetActiveCellInfo.cpp
riGetMainGridDimensions.cpp # riGetMainGridDimensions.cpp
riGetCurrentCase.cpp # riGetCurrentCase.cpp
riGetCaseGroups.cpp # riGetCaseGroups.cpp
riGetSelectedCases.cpp # riGetSelectedCases.cpp
riGetCases.cpp # riGetCases.cpp
riGetTimeStepDates.cpp # riGetTimeStepDates.cpp
riGetTimeStepDays.cpp # riGetTimeStepDays.cpp
riGetGridDimensions.cpp # riGetGridDimensions.cpp
riGetCoarseningInfo.cpp # riGetCoarseningInfo.cpp
riGetCellCenters.cpp # riGetCellCenters.cpp
riGetActiveCellCenters.cpp # riGetActiveCellCenters.cpp
riGetCellCorners.cpp # riGetCellCorners.cpp
riGetActiveCellCorners.cpp # riGetActiveCellCorners.cpp
riGetGridProperty.cpp riGetGridProperty.cpp
riSetGridProperty.cpp riSetGridProperty.cpp
riGetPropertyNames.cpp # riGetPropertyNames.cpp
riGetWellNames.cpp # riGetWellNames.cpp
riGetWellStatus.cpp # riGetWellStatus.cpp
riGetWellCells.cpp # riGetWellCells.cpp
) )
if (${CMAKE_SYSTEM_NAME} MATCHES "Linux") if (${CMAKE_SYSTEM_NAME} MATCHES "Linux")

View File

@ -1,7 +1,12 @@
#include <QtNetwork> #include <QtNetwork>
#include <QStringList>
#include <octave/oct.h> #include <octave/oct.h>
#include "riSettings.h" #include "riSettings.h"
#include "RiaSocketDataTransfer.cpp" // NB! Include cpp-file to avoid linking of additional file in oct-compile configuration
void getActiveCellProperty(Matrix& propertyFrames, const QString &serverName, quint16 serverPort, void getActiveCellProperty(Matrix& propertyFrames, const QString &serverName, quint16 serverPort,
const qint64& caseId, QString propertyName, const int32NDArray& requestedTimeSteps, QString porosityModel) const qint64& caseId, QString propertyName, const int32NDArray& requestedTimeSteps, QString porosityModel)
{ {
@ -63,44 +68,18 @@ void getActiveCellProperty(Matrix& propertyFrames, const QString &serverName, qu
return; return;
} }
// Wait for available data for each timestep, then read data for each timestep quint64 totalByteCount = byteCount * timestepCount;
for (size_t tIdx = 0; tIdx < timestepCount; ++tIdx) double* internalMatrixData = propertyFrames.fortran_vec();
QStringList errorMessages;
if (!RiaSocketDataTransfer::readBlockDataFromSocket(&socket, (char*)(internalMatrixData), totalByteCount, errorMessages))
{ {
while (socket.bytesAvailable() < (qint64)byteCount) for (int i = 0; i < errorMessages.size(); i++)
{ {
if (!socket.waitForReadyRead(riOctavePlugin::longTimeOutMilliSecs)) error(errorMessages[i].toLatin1().data());
{
error((("Waiting for timestep data number: ") + QString::number(tIdx)+ ": " + socket.errorString()).toLatin1().data());
octave_stdout << "Active cells: " << activeCellCount << ", Timesteps: " << timestepCount << std::endl;
return ;
}
OCTAVE_QUIT;
} }
qint64 bytesRead = 0; return;
double * internalMatrixData = propertyFrames.fortran_vec();
#if 0
// Raw data transfer. Faster. Not possible when dealing with coarsening
// bytesRead = socket.read((char*)(internalMatrixData + tIdx * activeCellCount), byteCount);
#else
// Compatible transfer. Now the only one working
for (size_t cIdx = 0; cIdx < activeCellCount; ++cIdx)
{
socketStream >> internalMatrixData[tIdx * activeCellCount + cIdx];
if (socketStream.status() == QDataStream::Ok) bytesRead += sizeof(double);
}
#endif
if ((qint64)byteCount != bytesRead)
{
error("Could not read binary double data properly from socket");
octave_stdout << "Active cells: " << activeCellCount << ", Timesteps: " << timestepCount << std::endl;
}
OCTAVE_QUIT;
} }
QString tmp = QString("riGetActiveCellProperty : Read %1").arg(propertyName); QString tmp = QString("riGetActiveCellProperty : Read %1").arg(propertyName);

View File

@ -4,7 +4,7 @@
#include <octave/oct.h> #include <octave/oct.h>
#include "riSettings.h" #include "riSettings.h"
#include "riSocketTools.h" #include "RiaSocketDataTransfer.cpp" // NB! Include cpp-file to avoid linking of additional file in oct-compile configuration
void getGridProperty(NDArray& propertyFrames, const QString &serverName, quint16 serverPort, void getGridProperty(NDArray& propertyFrames, const QString &serverName, quint16 serverPort,
@ -83,14 +83,14 @@ void getGridProperty(NDArray& propertyFrames, const QString &serverName, quint16
double* internalMatrixData = propertyFrames.fortran_vec(); double* internalMatrixData = propertyFrames.fortran_vec();
QStringList errorMessages; QStringList errorMessages;
if (!readBlockData(socket, (char*)(internalMatrixData), totalByteCount, errorMessages)) if (!RiaSocketDataTransfer::readBlockDataFromSocket(&socket, (char*)(internalMatrixData), totalByteCount, errorMessages))
{ {
for (int i = 0; i < errorMessages.size(); i++) for (int i = 0; i < errorMessages.size(); i++)
{ {
error(errorMessages[i].toLatin1().data()); error(errorMessages[i].toLatin1().data());
} }
OCTAVE_QUIT; return;
} }
QString tmp = QString("riGetGridProperty : Read %1").arg(propertyName); QString tmp = QString("riGetGridProperty : Read %1").arg(propertyName);

View File

@ -1,6 +1,10 @@
#include <QtNetwork> #include <QtNetwork>
#include <QStringList>
#include <octave/oct.h> #include <octave/oct.h>
#include "riSettings.h" #include "riSettings.h"
#include "RiaSocketDataTransfer.cpp" // NB! Include cpp-file to avoid linking of additional file in oct-compile configuration
void setEclipseProperty(const Matrix& propertyFrames, const QString &hostName, quint16 port, void setEclipseProperty(const Matrix& propertyFrames, const QString &hostName, quint16 port,
@ -47,10 +51,18 @@ void setEclipseProperty(const Matrix& propertyFrames, const QString &hostName, q
socketStream << (qint64)timeStepByteCount; socketStream << (qint64)timeStepByteCount;
const double* internalData = propertyFrames.fortran_vec(); const double* internalData = propertyFrames.fortran_vec();
qint64 dataWritten = socket.write((const char *)internalData, timeStepByteCount*timeStepCount);
if (dataWritten == timeStepByteCount*timeStepCount) QStringList errorMessages;
if (!RiaSocketDataTransfer::writeBlockDataToSocket(&socket, (const char *)internalData, timeStepByteCount*timeStepCount, errorMessages))
{ {
for (int i = 0; i < errorMessages.size(); i++)
{
octave_stdout << errorMessages[i].toStdString();
}
return;
}
QString tmp = QString("riSetActiveCellProperty : Wrote %1").arg(propertyName); QString tmp = QString("riSetActiveCellProperty : Wrote %1").arg(propertyName);
if (caseId == -1) if (caseId == -1)
@ -62,12 +74,6 @@ void setEclipseProperty(const Matrix& propertyFrames, const QString &hostName, q
tmp += QString(" to case with Id = %1.").arg(caseId); tmp += QString(" to case with Id = %1.").arg(caseId);
} }
octave_stdout << tmp.toStdString() << " Active Cells : " << cellCount << " Time steps : " << timeStepCount << std::endl; octave_stdout << tmp.toStdString() << " Active Cells : " << cellCount << " Time steps : " << timeStepCount << std::endl;
}
else
{
error("riSetActiveCellProperty : Was not able to write the proper amount of data to ResInsight:");
octave_stdout << " Active Cells : " << cellCount << "Time steps : " << timeStepCount << " Data Written: " << dataWritten << " Should have written: " << timeStepCount * cellCount * sizeof(double) << std::endl;
}
while(socket.bytesToWrite() && socket.state() == QAbstractSocket::ConnectedState) while(socket.bytesToWrite() && socket.state() == QAbstractSocket::ConnectedState)
{ {

View File

@ -1,6 +1,10 @@
#include <QtNetwork> #include <QtNetwork>
#include <QStringList>
#include <octave/oct.h> #include <octave/oct.h>
#include "riSettings.h" #include "riSettings.h"
#include "RiaSocketDataTransfer.cpp" // NB! Include cpp-file to avoid linking of additional file in oct-compile configuration
void setEclipseProperty(const NDArray& propertyFrames, const QString &hostName, quint16 port, void setEclipseProperty(const NDArray& propertyFrames, const QString &hostName, quint16 port,
@ -64,15 +68,22 @@ void setEclipseProperty(const NDArray& propertyFrames, const QString &hostName,
socketStream << (qint64)singleTimeStepByteCount; socketStream << (qint64)singleTimeStepByteCount;
const double* internalData = propertyFrames.fortran_vec(); const double* internalData = propertyFrames.fortran_vec();
qint64 dataWritten = 0;
for (size_t tsIdx = 0; tsIdx < timeStepCount; ++tsIdx) QStringList errorMessages;
if (!RiaSocketDataTransfer::writeBlockDataToSocket(&socket, (const char *)internalData, timeStepCount*singleTimeStepByteCount, errorMessages))
{ {
dataWritten += socket.write(((const char *)internalData) + tsIdx*singleTimeStepByteCount, singleTimeStepByteCount); for (int i = 0; i < errorMessages.size(); i++)
{
octave_stdout << errorMessages[i].toStdString();
}
size_t cellCount = cellCountI * cellCountJ * cellCountK;
error("riSetGridProperty : Was not able to write the proper amount of data to ResInsight:");
octave_stdout << " Cell count : " << cellCount << "Time steps : " << timeStepCount << std::endl;
return;
} }
if (dataWritten == singleTimeStepByteCount*timeStepCount)
{
QString tmp = QString("riSetGridProperty : Wrote %1").arg(propertyName); QString tmp = QString("riSetGridProperty : Wrote %1").arg(propertyName);
if (caseId == -1) if (caseId == -1)
@ -87,18 +98,11 @@ void setEclipseProperty(const NDArray& propertyFrames, const QString &hostName,
tmp += QString(" grid index: %1, ").arg(gridIndex); tmp += QString(" grid index: %1, ").arg(gridIndex);
octave_stdout << tmp.toStdString() << " Time steps : " << timeStepCount << std::endl; octave_stdout << tmp.toStdString() << " Time steps : " << timeStepCount << std::endl;
}
else
{
size_t cellCount = cellCountI * cellCountJ * cellCountK;
error("riSetGridProperty : Was not able to write the proper amount of data to ResInsight:");
octave_stdout << " Cell count : " << cellCount << "Time steps : " << timeStepCount << " Data Written: " << dataWritten << " Should have written: " << timeStepCount * cellCount * sizeof(double) << std::endl;
}
while(socket.bytesToWrite() && socket.state() == QAbstractSocket::ConnectedState) while(socket.bytesToWrite() && socket.state() == QAbstractSocket::ConnectedState)
{ {
octave_stdout << "Bytes to write: " << socket.bytesToWrite() << std::endl << std::flush; // octave_stdout << "Bytes to write: " << socket.bytesToWrite() << std::endl << std::flush;
socket.waitForBytesWritten(2000); socket.waitForBytesWritten(riOctavePlugin::longTimeOutMilliSecs);
OCTAVE_QUIT; OCTAVE_QUIT;
} }

View File

@ -2,7 +2,7 @@
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
/// ///
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
bool readBlockData(QTcpSocket& socket, char* data, quint64 bytesToRead, QStringList& errorMessages) bool readBlockData_to_be_deleted(QTcpSocket& socket, char* data, quint64 bytesToRead, QStringList& errorMessages)
{ {
quint64 bytesRead = 0; quint64 bytesRead = 0;
int blockCount = 0; int blockCount = 0;
@ -51,7 +51,7 @@ bool readBlockData(QTcpSocket& socket, char* data, quint64 bytesToRead, QStringL
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
/// ///
//-------------------------------------------------------------------------------------------------- //--------------------------------------------------------------------------------------------------
bool writeBlockData(QTcpSocket& socket, const char* data, quint64 bytesToWrite, QStringList& errorMessages) bool writeBlockData_to_be_deleted(QTcpSocket& socket, const char* data, quint64 bytesToWrite, QStringList& errorMessages)
{ {
quint64 bytesWritten = 0; quint64 bytesWritten = 0;
int blockCount = 0; int blockCount = 0;