#4566 #4572 Python: improve behaviour when sending more data than expected in property streams

This commit is contained in:
Gaute Lindkvist 2019-08-13 13:15:58 +02:00
parent 499798abc5
commit f3d8e3dbdc
16 changed files with 127 additions and 82 deletions

View File

@ -78,7 +78,7 @@ endif(RESINSIGHT_GRPC_PYTHON_EXECUTABLE AND EXISTS ${RESINSIGHT_GRPC_PYTHON_EXEC
# Proto files
set(PROTO_FILES
"Empty"
"Definitions"
"PdmObject"
"Case"
"Project"

View File

@ -2,7 +2,7 @@ syntax = "proto3";
package rips;
import "Empty.proto";
import "Definitions.proto";
service App {
rpc GetVersion(Empty) returns (Version) {}

View File

@ -1,7 +1,7 @@
syntax = "proto3";
import "Case.proto";
import "Empty.proto";
import "Definitions.proto";
package rips;

View File

@ -0,0 +1,12 @@
syntax = "proto3";
package rips;
message Empty
{
}
message ClientToServerStreamReply
{
int64 values_accepted = 1;
}

View File

@ -1,7 +0,0 @@
syntax = "proto3";
package rips;
message Empty
{
}

View File

@ -1,6 +1,6 @@
syntax = "proto3";
import "Empty.proto";
import "Definitions.proto";
package rips;

View File

@ -1,6 +1,6 @@
syntax = "proto3";
import "Empty.proto";
import "Definitions.proto";
import "Case.proto";
import "PdmObject.proto";

View File

@ -1,6 +1,6 @@
syntax = "proto3";
import "Empty.proto";
import "Definitions.proto";
import "Case.proto";
package rips;
@ -10,8 +10,8 @@ service Properties
rpc GetAvailableProperties(AvailablePropertiesRequest) returns (AvailableProperties) {}
rpc GetActiveCellProperty(PropertyRequest) returns (stream PropertyChunk) {}
rpc GetGridProperty(PropertyRequest) returns (stream PropertyChunk) {}
rpc SetActiveCellProperty(stream PropertyInputChunk) returns (Empty) {}
rpc SetGridProperty(stream PropertyInputChunk) returns (Empty) {}
rpc SetActiveCellProperty(stream PropertyInputChunk) returns (ClientToServerStreamReply) {}
rpc SetGridProperty(stream PropertyInputChunk) returns (ClientToServerStreamReply) {}
}
enum PropertyType

View File

@ -4,7 +4,7 @@ import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
from Empty_pb2 import Empty
from Definitions_pb2 import Empty
import App_pb2
import App_pb2_grpc

View File

@ -4,7 +4,7 @@ import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
from Empty_pb2 import Empty
from Definitions_pb2 import Empty
import Commands_pb2 as Cmd
import Commands_pb2_grpc as CmdRpc
from .Case import Case

View File

@ -4,7 +4,7 @@ import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
from Empty_pb2 import Empty
from Definitions_pb2 import Empty
import PdmObject_pb2
import PdmObject_pb2_grpc

View File

@ -10,7 +10,7 @@ from .View import View
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
from Empty_pb2 import Empty
from Definitions_pb2 import Empty
import Project_pb2
import Project_pb2_grpc

View File

@ -8,6 +8,7 @@ import Properties_pb2
import Properties_pb2_grpc
import Case_pb2
import Case_pb2_grpc
from Definitions_pb2 import ClientToServerStreamReply
class Properties:
""" Class for streaming properties to and from ResInsight
@ -35,13 +36,13 @@ class Properties:
# Meaning ideal number of doubles would be 8192.
# However we need overhead space, so if we choose 8160 in chunk size
# We have 256B left for overhead which should be plenty
chunkSize = 8000
chunkSize = 44431
index = -1
while index < len(array):
chunk = Properties_pb2.PropertyInputChunk()
if index is -1:
chunk.params.CopyFrom(parameters)
index += 1;
index += 1
else:
actualChunkSize = min(len(array) - index + 1, chunkSize)
chunk.values.CopyFrom(Properties_pb2.PropertyChunk(values = array[index:index+actualChunkSize]))
@ -184,8 +185,8 @@ class Properties:
time_step = timeStep,
porosity_model = porosityModelEnum)
try:
reply_iterator = self.__generatePropertyInputIterator(values_iterator, request)
self.propertiesStub.SetActiveCellProperty(reply_iterator)
request_iterator = self.__generatePropertyInputIterator(values_iterator, request)
self.propertiesStub.SetActiveCellProperty(request_iterator)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
print("Command not found")
@ -211,7 +212,10 @@ class Properties:
porosity_model = porosityModelEnum)
try:
request_iterator = self.__generatePropertyInputChunks(values, request)
self.propertiesStub.SetActiveCellProperty(request_iterator)
reply = self.propertiesStub.SetActiveCellProperty(request_iterator)
if reply.values_accepted != len(values):
print("ERROR: Attempted to write outside bounds of " + propertyName + " data storage");
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
print("Command not found")
@ -239,7 +243,10 @@ class Properties:
porosity_model = porosityModelEnum)
try:
request_iterator = self.__generatePropertyInputChunks(values, request)
self.propertiesStub.SetGridProperty(request_iterator)
reply = self.propertiesStub.SetGridProperty(request_iterator)
if reply.values_accepted != len(values):
print("ERROR: Attempted to write outside bounds of " + propertyName + " data storage");
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
print("Command not found")

View File

@ -337,19 +337,21 @@ void RiaGrpcClientToServerStreamCallback<ServiceT, RequestT, ReplyT, StateHandle
if (!this->m_status.ok())
{
this->setNextCallState(RiaGrpcCallbackInterface::FINISH_REQUEST);
if (this->m_status.error_code() == grpc::OUT_OF_RANGE)
m_reader.FinishWithError(this->m_status, this);
}
else
{
CAF_ASSERT(m_stateHandler->nrOfValuesReceived() <= m_stateHandler->cellCount());
if (m_stateHandler->nrOfValuesReceived() == m_stateHandler->cellCount())
{
this->setNextCallState(RiaGrpcCallbackInterface::FINISH_REQUEST);
m_reader.Finish(this->m_reply, grpc::Status::OK, this);
}
else
{
m_reader.FinishWithError(this->m_status, this);
m_reader.Read(&this->m_request, this);
}
}
else
{
m_reader.Read(&this->m_request, this);
}
}
}
//--------------------------------------------------------------------------------------------------

View File

@ -38,7 +38,7 @@
using namespace rips;
#define NUM_CONCURRENT_SERVER_STREAMS 10
#define NUM_CONCURRENT_CLIENT_TO_SERVER_STREAMS 10
//--------------------------------------------------------------------------------------------------
/// Abstract handler base class for streaming cell results to client
@ -56,12 +56,28 @@ public:
RiaCellResultsStateHandler(bool clientStreamer = false)
: m_request(nullptr)
, m_eclipseCase(nullptr)
, m_currentCellIdx(0u)
, m_nrOfValuesReceived(0u)
, m_cellCount(0u)
, m_clientStreamer(clientStreamer)
{
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
size_t cellCount() const
{
return m_cellCount;
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
size_t nrOfValuesReceived() const
{
return m_nrOfValuesReceived;
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
@ -79,31 +95,32 @@ public:
auto resultData = caseData->results(porosityModel);
auto resultType = static_cast<RiaDefines::ResultCatType>(request->property_type());
size_t timeStep = static_cast<size_t>(request->time_step());
RigEclipseResultAddress resAddr(resultType, QString::fromStdString(request->property_name()));
if (resultData->ensureKnownResultLoaded(resAddr))
m_resultAddress = RigEclipseResultAddress(resultType, QString::fromStdString(request->property_name()));
if (resultData->ensureKnownResultLoaded(m_resultAddress))
{
if (timeStep < resultData->timeStepCount(resAddr))
if (timeStep < resultData->timeStepCount(m_resultAddress))
{
initResultAccess(caseData, request->grid_index(), porosityModel, timeStep, resAddr);
initResultAccess(caseData, request->grid_index(), porosityModel, timeStep, m_resultAddress);
return grpc::Status::OK;
}
return grpc::Status(grpc::NOT_FOUND, "No such time step");
}
else if (m_clientStreamer)
{
resultData->createResultEntry(resAddr, true);
resultData->createResultEntry(m_resultAddress, true);
RigEclipseResultAddress addrToMaxTimeStepCountResult;
size_t timeStepCount = resultData->maxTimeStepCount(&addrToMaxTimeStepCountResult);
const std::vector<RigEclipseTimeStepInfo> timeStepInfos =
resultData->timeStepInfos(addrToMaxTimeStepCountResult);
resultData->setTimeStepInfos(resAddr, timeStepInfos);
auto scalarResultFrames = resultData->modifiableCellScalarResultTimesteps(resAddr);
resultData->setTimeStepInfos(m_resultAddress, timeStepInfos);
auto scalarResultFrames = resultData->modifiableCellScalarResultTimesteps(m_resultAddress);
scalarResultFrames.resize(timeStepCount);
if (timeStep < resultData->timeStepCount(resAddr))
if (timeStep < resultData->timeStepCount(m_resultAddress))
{
initResultAccess(caseData, request->grid_index(), porosityModel, timeStep, resAddr);
initResultAccess(caseData, request->grid_index(), porosityModel, timeStep, m_resultAddress);
return grpc::Status::OK;
}
return grpc::Status(grpc::NOT_FOUND, "No such time step");
@ -133,9 +150,9 @@ public:
const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::PropertyChunk));
size_t packageIndex = 0u;
reply->mutable_values()->Reserve((int)packageSize);
for (; packageIndex < packageSize && m_currentCellIdx < m_cellCount; ++packageIndex, ++m_currentCellIdx)
for (; packageIndex < packageSize && m_nrOfValuesReceived < m_cellCount; ++packageIndex, ++m_nrOfValuesReceived)
{
reply->add_values(cellResult(m_currentCellIdx));
reply->add_values(cellResult(m_nrOfValuesReceived));
}
if (packageIndex > 0u)
{
@ -148,23 +165,31 @@ public:
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
Status receiveStreamRequest(const PropertyInputChunk* request)
Status receiveStreamRequest(const PropertyInputChunk* request, ClientToServerStreamReply* reply)
{
CAF_ASSERT(!request->has_params());
if (request->has_values())
{
auto values = request->values().values();
for (int i = 0; i < values.size() && m_currentCellIdx < m_cellCount; ++i, ++m_currentCellIdx)
{
setCellResult(m_currentCellIdx, values[i]);
}
if (m_currentCellIdx >= m_cellCount - 1)
{
return grpc::Status(grpc::OUT_OF_RANGE, "All values have been written");
}
if (!values.empty())
{
size_t currentCellIdx = m_nrOfValuesReceived;
m_nrOfValuesReceived += values.size();
return Status::OK;
for (int i = 0; i < values.size() && currentCellIdx < m_cellCount; ++i, ++currentCellIdx)
{
setCellResult(currentCellIdx, values[i]);
}
if (m_nrOfValuesReceived > m_cellCount)
{
return grpc::Status(grpc::OUT_OF_RANGE, "Attempting to write out of bounds");
}
reply->set_values_accepted(static_cast<int64_t>(currentCellIdx));
return Status::OK;
}
}
return grpc::Status(grpc::OUT_OF_RANGE, "No messages to write");
return Status::OK;
}
//--------------------------------------------------------------------------------------------------
@ -174,6 +199,11 @@ public:
{
if (m_eclipseCase)
{
auto porosityModel = static_cast<RiaDefines::PorosityModelType>(m_request->porosity_model());
auto caseData = m_eclipseCase->eclipseCaseData();
auto resultData = caseData->results(porosityModel);
resultData->recalculateStatistics(m_resultAddress);
for (Rim3dView* view : m_eclipseCase->views())
{
view->setCurrentTimeStepAndUpdate(view->currentTimeStep());
@ -192,11 +222,12 @@ protected:
virtual void setCellResult(size_t currentCellIndex, double value) = 0;
protected:
const rips::PropertyRequest* m_request;
RimEclipseCase* m_eclipseCase;
size_t m_currentCellIdx;
size_t m_cellCount;
bool m_clientStreamer;
const rips::PropertyRequest* m_request;
RimEclipseCase* m_eclipseCase;
size_t m_nrOfValuesReceived;
size_t m_cellCount;
bool m_clientStreamer;
RigEclipseResultAddress m_resultAddress;
};
class RiaActiveCellResultsStateHandler : public RiaCellResultsStateHandler
@ -328,22 +359,22 @@ grpc::Status RiaGrpcPropertiesService::GetGridProperty(grpc::ServerContext*
//--------------------------------------------------------------------------------------------------
grpc::Status RiaGrpcPropertiesService::SetActiveCellProperty(grpc::ServerContext* context,
const rips::PropertyInputChunk* request,
rips::Empty* reply,
rips::ClientToServerStreamReply* reply,
RiaActiveCellResultsStateHandler* stateHandler)
{
return stateHandler->receiveStreamRequest(request);
return stateHandler->receiveStreamRequest(request, reply);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
grpc::Status RiaGrpcPropertiesService::SetGridProperty(grpc::ServerContext* context,
const rips::PropertyInputChunk* request,
rips::Empty* reply,
RiaGridCellResultsStateHandler* stateHandler)
grpc::Status RiaGrpcPropertiesService::SetGridProperty(grpc::ServerContext* context,
const rips::PropertyInputChunk* request,
rips::ClientToServerStreamReply* reply,
RiaGridCellResultsStateHandler* stateHandler)
{
return stateHandler->receiveStreamRequest(request);
return stateHandler->receiveStreamRequest(request, reply);
}
//--------------------------------------------------------------------------------------------------
///
@ -356,12 +387,12 @@ std::vector<RiaGrpcCallbackInterface*> RiaGrpcPropertiesService::createCallbacks
callbacks = {
new RiaGrpcUnaryCallback<Self, AvailablePropertiesRequest, AvailableProperties>(
this, &Self::GetAvailableProperties, &Self::RequestGetAvailableProperties),
new RiaGrpcClientToServerStreamCallback<Self, PropertyInputChunk, Empty, RiaActiveCellResultsStateHandler>(
new RiaGrpcClientToServerStreamCallback<Self, PropertyInputChunk, ClientToServerStreamReply, RiaActiveCellResultsStateHandler>(
this, &Self::SetActiveCellProperty, &Self::RequestSetActiveCellProperty, new RiaActiveCellResultsStateHandler(true)),
new RiaGrpcClientToServerStreamCallback<Self, PropertyInputChunk, Empty, RiaGridCellResultsStateHandler>(
new RiaGrpcClientToServerStreamCallback<Self, PropertyInputChunk, ClientToServerStreamReply, RiaGridCellResultsStateHandler>(
this, &Self::SetGridProperty, &Self::RequestSetGridProperty, new RiaGridCellResultsStateHandler(true))};
for (int i = 0; i < NUM_CONCURRENT_SERVER_STREAMS; ++i)
for (int i = 0; i < NUM_CONCURRENT_CLIENT_TO_SERVER_STREAMS; ++i)
{
callbacks.push_back(new RiaGrpcServerToClientStreamCallback<Self, PropertyRequest, PropertyChunk, RiaActiveCellResultsStateHandler>(
this, &Self::GetActiveCellProperty, &Self::RequestGetActiveCellProperty, new RiaActiveCellResultsStateHandler));

View File

@ -38,21 +38,21 @@ public:
const rips::AvailablePropertiesRequest* request,
rips::AvailableProperties* reply) override;
grpc::Status GetActiveCellProperty(grpc::ServerContext* context,
const rips::PropertyRequest* request,
rips::PropertyChunk* reply,
const rips::PropertyRequest* request,
rips::PropertyChunk* reply,
RiaActiveCellResultsStateHandler* stateHandler);
grpc::Status GetGridProperty(grpc::ServerContext* context,
const rips::PropertyRequest* request,
rips::PropertyChunk* reply,
const rips::PropertyRequest* request,
rips::PropertyChunk* reply,
RiaGridCellResultsStateHandler* stateHandler);
grpc::Status SetActiveCellProperty(grpc::ServerContext* context,
grpc::Status SetActiveCellProperty(grpc::ServerContext* context,
const rips::PropertyInputChunk* chunk,
rips::Empty* reply,
rips::ClientToServerStreamReply* reply,
RiaActiveCellResultsStateHandler* stateHandler);
grpc::Status SetGridProperty(grpc::ServerContext* context,
const rips::PropertyInputChunk* chunk,
rips::Empty* reply,
RiaGridCellResultsStateHandler* stateHandler);
grpc::Status SetGridProperty(grpc::ServerContext* context,
const rips::PropertyInputChunk* chunk,
rips::ClientToServerStreamReply* reply,
RiaGridCellResultsStateHandler* stateHandler);
std::vector<RiaGrpcCallbackInterface*> createCallbacks() override;
};