#4430 Implement client -> server streaming framework and SetActiveCellProperty

This commit is contained in:
Gaute Lindkvist 2019-05-22 15:30:09 +02:00
parent 90af7a2e6b
commit c30721593e
12 changed files with 391 additions and 81 deletions

View File

@ -9,7 +9,7 @@ service GridInfo
// This function returns a two dimensional matrix: One row for each grid, starting with the main grid.
rpc GetGridCount(Case) returns(GridCount) {}
rpc GetGridDimensions(Case) returns (GridDimensions) {}
rpc StreamActiveCellInfo(ActiveCellInfoRequest) returns (stream ActiveCellInfoArray) {}
rpc GetCellInfoForActiveCells(CellInfoRequest) returns (stream CellInfoArray) {}
rpc GetAllCoarseningInfoArray(Case) returns (CoarseningInfoArray) {}
rpc GetTimeSteps(Case) returns (TimeStepDates) {}
rpc GetTimeStepDaysSinceStart(Case) returns (DoubleDates) {}
@ -37,18 +37,18 @@ enum PorosityModelType
FRACTURE_MODEL = 1;
}
message ActiveCellInfoRequest
message CellInfoRequest
{
int32 case_id = 1;
PorosityModelType porosity_model = 2;
}
message ActiveCellInfoArray
message CellInfoArray
{
repeated ActiveCellInfo data = 1;
repeated CellInfo data = 1;
}
message ActiveCellInfo
message CellInfo
{
int32 grid_index = 1;
int32 parent_grid_index = 2;

View File

@ -9,10 +9,10 @@ package rips;
service Properties
{
rpc GetAvailableProperties(PropertiesRequest) returns (AvailableProperties) {}
rpc GetActiveCellResults(ResultRequest) returns (stream ResultReplyArray) {}
rpc GetGridResults(ResultRequest) returns (stream ResultReplyArray) {}
rpc SetActiveCellResults(stream ResultRequestArray) returns (Empty) {}
rpc SetGridResults(stream ResultRequestArray) returns (Empty) {}
rpc GetActiveCellResults(ResultRequest) returns (stream ResultArray) {}
rpc GetGridResults(ResultRequest) returns (stream ResultArray) {}
rpc SetActiveCellResults(stream ResultRequestChunk) returns (Empty) {}
rpc SetGridResults(stream ResultRequestChunk) returns (Empty) {}
}
enum PropertyType
@ -56,14 +56,14 @@ message TimeStep
int32 index = 1;
}
message ResultRequestArray
message ResultRequestChunk
{
repeated double values = 1;
Case request_case = 2;
TimeStep time_step = 3;
// Params needs to be sent in the first message
ResultRequest params = 1;
ResultArray values = 2;
}
message ResultReplyArray
message ResultArray
{
repeated double values = 1;
}

View File

@ -25,6 +25,7 @@
using grpc::CompletionQueue;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerAsyncReader;
using grpc::ServerAsyncWriter;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
@ -44,7 +45,8 @@ public:
enum CallState
{
CREATE_HANDLER,
INIT_REQUEST,
INIT_REQUEST_STARTED,
INIT_REQUEST_COMPLETED,
PROCESS_REQUEST,
FINISH_REQUEST
};
@ -56,9 +58,10 @@ public:
virtual QString name() const = 0;
virtual RiaAbstractGrpcCallback* createNewFromThis() const = 0;
virtual void createRequestHandler(ServerCompletionQueue* completionQueue) = 0;
virtual void initRequest() = 0;
virtual void processRequest() = 0;
virtual void finishRequest() {}
virtual void onInitRequestStarted() {}
virtual void onInitRequestCompleted() = 0;
virtual void onProcessRequest() = 0;
virtual void onFinishRequest() {}
inline CallState callState() const;
inline const Status& status() const;
@ -114,8 +117,8 @@ public:
RiaAbstractGrpcCallback* createNewFromThis() const override;
void createRequestHandler(ServerCompletionQueue* completionQueue) override;
void initRequest() override;
void processRequest() override;
void onInitRequestCompleted() override;
void onProcessRequest() override;
protected:
virtual QString methodType() const;
@ -133,7 +136,7 @@ private:
//
// The streaming callback needs a state handler for setting up and maintaining order.
//
// A fully functional stream handler needs to implement the following methods:
// A fully functional state handler needs to implement the following methods:
// 1. Default Constructor
// 2. grpc::Status init(const grpc::Message* request)
//
@ -152,8 +155,8 @@ public:
RiaAbstractGrpcCallback* createNewFromThis() const override;
void createRequestHandler(ServerCompletionQueue* completionQueue) override;
void initRequest() override;
void processRequest() override;
void onInitRequestCompleted() override;
void onProcessRequest() override;
protected:
virtual QString methodType() const;
@ -166,4 +169,46 @@ private:
std::unique_ptr<StateHandlerT> m_stateHandler;
};
//==================================================================================================
//
// Templated client *streaming* gRPC-callback calling service implementation callbacks
//
// The streaming callback needs a state handler for setting up and maintaining order.
//
// A fully functional state handler needs to implement the following methods:
// 1. Default Constructor
// 2. grpc::Status init(const grpc::Message* request)
// 3. void finish() any updates required after completion
//
//==================================================================================================
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
class RiaGrpcClientStreamCallback : public RiaGrpcRequestCallback<ServiceT, RequestT, ReplyT>
{
public:
typedef ServerAsyncReader<ReplyT, RequestT> RequestReaderT;
typedef std::function<Status(ServiceT&, ServerContext*, const RequestT*, ReplyT*, StateHandlerT*)> MethodImplT;
typedef std::function<
void(ServiceT&, ServerContext*, RequestReaderT*, CompletionQueue*, ServerCompletionQueue*, void*)>
MethodRequestT;
RiaGrpcClientStreamCallback(ServiceT* service, MethodImplT methodImpl, MethodRequestT methodRequest, StateHandlerT* stateHandler);
RiaAbstractGrpcCallback* createNewFromThis() const override;
void createRequestHandler(ServerCompletionQueue* completionQueue) override;
void onInitRequestStarted() override;
void onInitRequestCompleted() override;
void onProcessRequest() override;
void onFinishRequest() override;
protected:
virtual QString methodType() const;
private:
ServerContext m_context;
RequestReaderT m_reader;
MethodImplT m_methodImpl;
MethodRequestT m_methodRequest;
std::unique_ptr<StateHandlerT> m_stateHandler;
};
#include "RiaGrpcCallbacks.inl"

View File

@ -116,24 +116,26 @@ RiaAbstractGrpcCallback* RiaGrpcCallback<ServiceT, RequestT, ReplyT>::createNewF
template<typename ServiceT, typename RequestT, typename ReplyT>
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::createRequestHandler(ServerCompletionQueue* completionQueue)
{
// The Request-method is where the request gets filled in with data from the gRPC stack:
m_methodRequest(*this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT>
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::initRequest()
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::onInitRequestCompleted()
{
this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
this->onProcessRequest();
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT>
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::processRequest()
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::onProcessRequest()
{
this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply);
m_responder.Finish(this->m_reply, this->m_status, this);
@ -183,24 +185,25 @@ void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::createReq
ServerCompletionQueue* completionQueue)
{
m_methodRequest(*this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
}
//--------------------------------------------------------------------------------------------------
/// Perform initialisation tasks at the time of receiving a request
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::initRequest()
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onInitRequestCompleted()
{
this->m_status = m_stateHandler->init(&this->m_request);
this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
this->onProcessRequest();
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::processRequest()
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onProcessRequest()
{
this->m_reply = ReplyT(); // Make sure it is reset
@ -236,3 +239,114 @@ QString RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::method
{
return "StreamingMethod";
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::RiaGrpcClientStreamCallback(ServiceT* service,
MethodImplT methodImpl,
MethodRequestT methodRequest,
StateHandlerT* stateHandler)
: RiaGrpcRequestCallback<ServiceT, RequestT, ReplyT>(service)
, m_reader(&m_context)
, m_methodImpl(methodImpl)
, m_methodRequest(methodRequest)
, m_stateHandler(stateHandler)
{
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
RiaAbstractGrpcCallback* RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::createNewFromThis() const
{
return new RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>(
this->m_service, m_methodImpl, m_methodRequest, new StateHandlerT);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::createRequestHandler(
ServerCompletionQueue* completionQueue)
{
m_methodRequest(*this->m_service, &m_context, &this->m_reader, completionQueue, completionQueue, this);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_STARTED);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onInitRequestStarted()
{
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
m_reader.Read(&m_request, this);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onInitRequestCompleted()
{
this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
this->m_status = m_stateHandler->init(&this->m_request); // Fully received the stream package so can now init
m_reader.Read(&m_request, this);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onProcessRequest()
{
this->m_reply = ReplyT(); // Make sure it is reset
if (!this->m_status.ok())
{
m_reader.Finish(this->m_reply, this->m_status, this);
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
return;
}
this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply, m_stateHandler.get());
if (!this->m_status.ok())
{
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
if (this->m_status.error_code() == grpc::OUT_OF_RANGE)
{
m_reader.Finish(this->m_reply, grpc::Status::OK, this);
}
else
{
m_reader.FinishWithError(this->m_status, this);
}
}
else
{
m_reader.Read(&this->m_request, this);
}
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onFinishRequest()
{
m_stateHandler->finish();
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
QString RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::methodType() const
{
return "ClientStreamingMethod";
}

View File

@ -43,7 +43,7 @@ RiaActiveCellInfoStateHandler::RiaActiveCellInfoStateHandler()
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
grpc::Status RiaActiveCellInfoStateHandler::init(const rips::ActiveCellInfoRequest* request)
grpc::Status RiaActiveCellInfoStateHandler::init(const rips::CellInfoRequest* request)
{
CAF_ASSERT(request);
m_request = request;
@ -87,7 +87,7 @@ grpc::Status RiaActiveCellInfoStateHandler::init(const rips::ActiveCellInfoReque
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::ActiveCellInfo* cellInfo)
grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::CellInfo* cellInfo)
{
const std::vector<RigCell>& reservoirCells = m_eclipseCase->eclipseCaseData()->mainGrid()->globalCellArray();
@ -96,7 +96,7 @@ grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::A
size_t cellIdxToTry = m_currentCellIdx++;
if (m_activeCellInfo->isActive(cellIdxToTry))
{
assignActiveCellInfoData(cellInfo, reservoirCells, cellIdxToTry);
assignCellInfoData(cellInfo, reservoirCells, cellIdxToTry);
return grpc::Status::OK;
}
}
@ -106,7 +106,7 @@ grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::A
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaActiveCellInfoStateHandler::assignActiveCellInfoData(rips::ActiveCellInfo* cellInfo,
void RiaActiveCellInfoStateHandler::assignCellInfoData(rips::CellInfo* cellInfo,
const std::vector<RigCell>& reservoirCells,
size_t cellIdx)
{
@ -184,18 +184,18 @@ const std::vector<RigCell>& RiaActiveCellInfoStateHandler::reservoirCells() cons
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
grpc::Status RiaActiveCellInfoStateHandler::assignReply(rips::ActiveCellInfoArray* reply)
grpc::Status RiaActiveCellInfoStateHandler::assignReply(rips::CellInfoArray* reply)
{
const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::ActiveCellInfoArray));
const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::CellInfoArray));
size_t packageIndex = 0u;
reply->mutable_data()->Reserve((int)packageSize);
for (; packageIndex < packageSize && m_currentCellIdx < m_activeCellInfo->reservoirCellCount(); ++packageIndex)
{
rips::ActiveCellInfo singleCellInfo;
rips::CellInfo singleCellInfo;
grpc::Status singleCellInfoStatus = assignNextActiveCellInfoData(&singleCellInfo);
if (singleCellInfoStatus.ok())
{
rips::ActiveCellInfo* allocCellInfo = reply->add_data();
rips::CellInfo* allocCellInfo = reply->add_data();
*allocCellInfo = singleCellInfo;
}
else
@ -258,9 +258,9 @@ grpc::Status RiaGrpcGridInfoService::GetGridDimensions(grpc::ServerContext*
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
grpc::Status RiaGrpcGridInfoService::StreamActiveCellInfo(grpc::ServerContext* context,
const rips::ActiveCellInfoRequest* request,
rips::ActiveCellInfoArray* reply,
grpc::Status RiaGrpcGridInfoService::GetCellInfoForActiveCells(grpc::ServerContext* context,
const rips::CellInfoRequest* request,
rips::CellInfoArray* reply,
RiaActiveCellInfoStateHandler* stateHandler)
{
return stateHandler->assignReply(reply);
@ -275,8 +275,8 @@ std::vector<RiaAbstractGrpcCallback*> RiaGrpcGridInfoService::createCallbacks()
return {new RiaGrpcCallback<Self, Case, GridCount>(this, &Self::GetGridCount, &Self::RequestGetGridCount),
new RiaGrpcCallback<Self, Case, GridDimensions>(this, &Self::GetGridDimensions, &Self::RequestGetGridDimensions),
new RiaGrpcStreamCallback<Self, ActiveCellInfoRequest, ActiveCellInfoArray, RiaActiveCellInfoStateHandler>(
this, &Self::StreamActiveCellInfo, &Self::RequestStreamActiveCellInfo, new RiaActiveCellInfoStateHandler)};
new RiaGrpcStreamCallback<Self, CellInfoRequest, CellInfoArray, RiaActiveCellInfoStateHandler>(
this, &Self::GetCellInfoForActiveCells, &Self::RequestGetCellInfoForActiveCells, new RiaActiveCellInfoStateHandler)};
}
static bool RiaGrpcGridInfoService_init =

View File

@ -45,15 +45,15 @@ class RiaActiveCellInfoStateHandler
public:
RiaActiveCellInfoStateHandler();
Status init(const rips::ActiveCellInfoRequest* request);
Status assignNextActiveCellInfoData(rips::ActiveCellInfo* cellInfo);
void assignActiveCellInfoData(rips::ActiveCellInfo* cellInfo, const std::vector<RigCell>& reservoirCells, size_t cellIdx);
Status assignReply(rips::ActiveCellInfoArray* reply);
Status init(const rips::CellInfoRequest* request);
Status assignNextActiveCellInfoData(rips::CellInfo* cellInfo);
void assignCellInfoData(rips::CellInfo* cellInfo, const std::vector<RigCell>& reservoirCells, size_t cellIdx);
Status assignReply(rips::CellInfoArray* reply);
RigActiveCellInfo* activeCellInfo() const;
const std::vector<RigCell>& reservoirCells() const;
protected:
const rips::ActiveCellInfoRequest* m_request;
const rips::CellInfoRequest* m_request;
RimEclipseCase* m_eclipseCase;
RiaDefines::PorosityModelType m_porosityModel;
RigActiveCellInfo* m_activeCellInfo;
@ -71,9 +71,9 @@ class RiaGrpcGridInfoService final : public rips::GridInfo::AsyncService, public
public:
grpc::Status GetGridCount(grpc::ServerContext* context, const rips::Case* request, rips::GridCount* reply) override;
grpc::Status GetGridDimensions(grpc::ServerContext* context, const rips::Case* request, rips::GridDimensions* reply) override;
grpc::Status StreamActiveCellInfo(grpc::ServerContext* context,
const rips::ActiveCellInfoRequest* request,
rips::ActiveCellInfoArray* reply,
RiaActiveCellInfoStateHandler* stateHandler);
grpc::Status GetCellInfoForActiveCells(grpc::ServerContext* context,
const rips::CellInfoRequest* request,
rips::CellInfoArray* reply,
RiaActiveCellInfoStateHandler* stateHandler);
std::vector<RiaAbstractGrpcCallback*> createCallbacks() override;
};

View File

@ -29,7 +29,10 @@
#include "RigMainGrid.h"
#include "RigResultAccessor.h"
#include "RigResultAccessorFactory.h"
#include "RigResultModifier.h"
#include "RigResultModifierFactory.h"
#include "Rim3dView.h"
#include "RimEclipseCase.h"
using namespace rips;
@ -58,12 +61,13 @@ public:
//--------------------------------------------------------------------------------------------------
Status init(const ResultRequest* request)
{
int caseId = request->request_case().id();
RimEclipseCase* eclipseCase = dynamic_cast<RimEclipseCase*>(RiaGrpcServiceInterface::findCase(caseId));
if (eclipseCase)
int caseId = request->request_case().id();
m_eclipseCase = dynamic_cast<RimEclipseCase*>(RiaGrpcServiceInterface::findCase(caseId));
if (m_eclipseCase)
{
auto porosityModel = static_cast<RiaDefines::PorosityModelType>(request->porosity_model());
auto caseData = eclipseCase->eclipseCaseData();
auto caseData = m_eclipseCase->eclipseCaseData();
auto resultData = caseData->results(porosityModel);
auto resultType = static_cast<RiaDefines::ResultCatType>(request->property_type());
size_t timeStep = static_cast<size_t>(request->time_step());
@ -83,12 +87,24 @@ public:
return grpc::Status(grpc::NOT_FOUND, "Couldn't find an Eclipse case matching the case Id");
}
//--------------------------------------------------------------------------------------------------
/// Client streamers need to be initialised with the encapsulated parameters
//--------------------------------------------------------------------------------------------------
Status init(const ResultRequestChunk* request)
{
if (request->has_params())
{
return init(&(request->params()));
}
return grpc::Status(grpc::INVALID_ARGUMENT, "Need to have ResultRequest parameters in first message");
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
Status assignReply(ResultReplyArray* reply)
Status assignStreamReply(ResultArray* reply)
{
const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::ResultReplyArray));
const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::ResultArray));
size_t packageIndex = 0u;
reply->mutable_values()->Reserve((int)packageSize);
for (; packageIndex < packageSize && m_currentCellIdx < m_cellCount; ++packageIndex, ++m_currentCellIdx)
@ -103,6 +119,40 @@ public:
"We've reached the end. This is not an error but means transmission is finished");
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
Status receiveStreamRequest(const ResultRequestChunk* request)
{
if (request->has_values())
{
auto values = request->values().values();
for (int i = 0; i < values.size() && m_currentCellIdx < m_cellCount; ++i, ++m_currentCellIdx)
{
setCellResult(m_currentCellIdx, values[i]);
}
if (m_currentCellIdx == m_cellCount - 1)
{
return grpc::Status(grpc::OUT_OF_RANGE, "All values have been written");
}
return Status::OK;
}
return grpc::Status(grpc::OUT_OF_RANGE, "No messages to write");
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void finish()
{
for (Rim3dView* view : m_eclipseCase->views())
{
view->setCurrentTimeStepAndUpdate(view->currentTimeStep());
view->createDisplayModelAndRedraw();
}
}
protected:
virtual void initResultAccess(RigEclipseCaseData* caseData,
size_t gridIndex,
@ -110,9 +160,11 @@ protected:
size_t timeStepIndex,
RigEclipseResultAddress resVarAddr) = 0;
virtual double cellResult(size_t currentCellIndex) const = 0;
virtual void setCellResult(size_t currentCellIndex, double value) = 0;
protected:
const rips::ResultRequest* m_request;
RimEclipseCase* m_eclipseCase;
size_t m_currentCellIdx;
size_t m_cellCount;
};
@ -127,7 +179,7 @@ protected:
RigEclipseResultAddress resVarAddr) override
{
auto activeCellInfo = caseData->activeCellInfo(porosityModel);
m_resultValues = &(caseData->results(porosityModel)->cellScalarResults(resVarAddr, timeStepIndex));
m_resultValues = &(caseData->results(porosityModel)->modifiableCellScalarResult(resVarAddr, timeStepIndex));
m_cellCount = activeCellInfo->reservoirActiveCellCount();
}
@ -136,8 +188,13 @@ protected:
return (*m_resultValues)[currentCellIndex];
}
void setCellResult(size_t currentCellIndex, double value) override
{
(*m_resultValues)[currentCellIndex] = value;
}
private:
const std::vector<double>* m_resultValues;
std::vector<double>* m_resultValues;
};
class RiaGridCellResultsStateHandler : public RiaCellResultsStateHandler
@ -150,6 +207,7 @@ protected:
RigEclipseResultAddress resVarAddr) override
{
m_resultAccessor = RigResultAccessorFactory::createFromResultAddress(caseData, gridIndex, porosityModel, timeStepIndex, resVarAddr);
m_resultModifier = RigResultModifierFactory::createResultModifier(caseData, gridIndex, porosityModel, timeStepIndex, resVarAddr);
m_cellCount = caseData->grid(gridIndex)->cellCount();
}
@ -158,8 +216,14 @@ protected:
return m_resultAccessor->cellScalar(currentCellIndex);
}
void setCellResult(size_t currentCellIndex, double value) override
{
return m_resultModifier->setCellScalar(currentCellIndex, value);
}
private:
cvf::ref<RigResultAccessor> m_resultAccessor;
cvf::ref<RigResultModifier> m_resultModifier;
};
//--------------------------------------------------------------------------------------------------
@ -195,10 +259,10 @@ grpc::Status RiaGrpcPropertiesService::GetAvailableProperties(grpc::ServerContex
//--------------------------------------------------------------------------------------------------
grpc::Status RiaGrpcPropertiesService::GetActiveCellResults(grpc::ServerContext* context,
const ResultRequest* request,
ResultReplyArray* reply,
ResultArray* reply,
RiaActiveCellResultsStateHandler* stateHandler)
{
return stateHandler->assignReply(reply);
return stateHandler->assignStreamReply(reply);
}
@ -207,10 +271,21 @@ grpc::Status RiaGrpcPropertiesService::GetActiveCellResults(grpc::ServerContext*
//--------------------------------------------------------------------------------------------------
grpc::Status RiaGrpcPropertiesService::GetGridResults(grpc::ServerContext* context,
const rips::ResultRequest* request,
rips::ResultReplyArray* reply,
rips::ResultArray* reply,
RiaGridCellResultsStateHandler* stateHandler)
{
return stateHandler->assignReply(reply);
return stateHandler->assignStreamReply(reply);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
grpc::Status RiaGrpcPropertiesService::SetActiveCellResults(grpc::ServerContext* context,
const rips::ResultRequestChunk* request,
rips::Empty* reply,
RiaActiveCellResultsStateHandler* stateHandler)
{
return stateHandler->receiveStreamRequest(request);
}
//--------------------------------------------------------------------------------------------------
@ -222,10 +297,12 @@ std::vector<RiaAbstractGrpcCallback*> RiaGrpcPropertiesService::createCallbacks(
return {new RiaGrpcCallback<Self, PropertiesRequest, AvailableProperties>(
this, &Self::GetAvailableProperties, &Self::RequestGetAvailableProperties),
new RiaGrpcStreamCallback<Self, ResultRequest, ResultReplyArray, RiaActiveCellResultsStateHandler>(
new RiaGrpcStreamCallback<Self, ResultRequest, ResultArray, RiaActiveCellResultsStateHandler>(
this, &Self::GetActiveCellResults, &Self::RequestGetActiveCellResults, new RiaActiveCellResultsStateHandler),
new RiaGrpcStreamCallback<Self, ResultRequest, ResultReplyArray, RiaGridCellResultsStateHandler>(
this, &Self::GetGridResults, &Self::RequestGetGridResults, new RiaGridCellResultsStateHandler)
new RiaGrpcStreamCallback<Self, ResultRequest, ResultArray, RiaGridCellResultsStateHandler>(
this, &Self::GetGridResults, &Self::RequestGetGridResults, new RiaGridCellResultsStateHandler),
new RiaGrpcClientStreamCallback<Self, ResultRequestChunk, Empty, RiaActiveCellResultsStateHandler>(
this, &Self::SetActiveCellResults, &Self::RequestSetActiveCellResults, new RiaActiveCellResultsStateHandler)
};
}

View File

@ -39,13 +39,16 @@ public:
rips::AvailableProperties* reply) override;
grpc::Status GetActiveCellResults(grpc::ServerContext* context,
const rips::ResultRequest* request,
rips::ResultReplyArray* reply,
rips::ResultArray* reply,
RiaActiveCellResultsStateHandler* stateHandler);
grpc::Status GetGridResults(grpc::ServerContext* context,
const rips::ResultRequest* request,
rips::ResultReplyArray* reply,
rips::ResultArray* reply,
RiaGridCellResultsStateHandler* stateHandler);
grpc::Status SetActiveCellResults(grpc::ServerContext* context,
const rips::ResultRequestChunk* request,
rips::Empty* reply,
RiaActiveCellResultsStateHandler* stateHandler);
std::vector<RiaAbstractGrpcCallback*> createCallbacks() override;
};

View File

@ -63,7 +63,7 @@ public:
void initialize();
void processRequests();
void quit();
int currentPortNumber;
int currentPortNumber;
private:
void waitForNextRequest();
@ -242,22 +242,28 @@ void RiaGrpcServerImpl::process(RiaAbstractGrpcCallback* method)
RiaLogging::debug(QString("Initialising request handler for: %1").arg(method->name()));
method->createRequestHandler(m_completionQueue.get());
}
else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST)
else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST_STARTED)
{
// Perform initialization and immediately process the first request
// The initialization is necessary for streaming services.
RiaLogging::info(QString("Starting handling: %1").arg(method->name()));
method->initRequest();
method->processRequest();
method->onInitRequestStarted();
}
else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED)
{
// Perform initialization and immediately process the first request
// The initialization is necessary for streaming services.
RiaLogging::info(QString("Initialising handling: %1").arg(method->name()));
method->onInitRequestCompleted();
}
else if (method->callState() == RiaAbstractGrpcCallback::PROCESS_REQUEST)
{
method->processRequest();
method->onProcessRequest();
}
else
{
RiaLogging::info(QString("Finished handling: %1").arg(method->name()));
method->finishRequest();
method->onFinishRequest();
process(method->createNewFromThis());
delete method;
}

View File

@ -4,6 +4,8 @@ import grpc
import os
import sys
import socket
import logging
sys.path.insert(1, os.path.join(sys.path[0], '../generated'))
@ -68,14 +70,14 @@ class GridInfo:
def __init__(self, channel):
self.gridInfo = GridInfo_pb2_grpc.GridInfoStub(channel)
def getGridCount(self, caseId=0):
def gridCount(self, caseId=0):
return self.gridInfo.GetGridCount(CaseInfo_pb2.Case(id=caseId)).count
def getGridDimensions(self, caseId=0):
def gridDimensions(self, caseId=0):
return self.gridInfo.GetGridDimensions(CaseInfo_pb2.Case(id=caseId)).dimensions
def streamActiveCellInfo(self, caseId=0):
return self.gridInfo.StreamActiveCellInfo(CaseInfo_pb2.Case(id=caseId))
def cellInfoForActiveCells(self, caseId=0):
return self.gridInfo.GetCellInfoForActiveCells(CaseInfo_pb2.Case(id=caseId))
class ProjectInfo:
def __init__(self, channel):
@ -96,6 +98,33 @@ class ProjectInfo:
class Properties:
def __init__(self, channel):
self.properties = Properties_pb2_grpc.PropertiesStub(channel)
def generateResultRequestArrays(self, array, parameters):
# Each double is 8 bytes. A good chunk size is 64KiB = 65536B
# Meaning ideal number of doubles would be 8192.
# However we need overhead space, so if we choose 8160 in chunk size
# We have 256B left for overhead which should be plenty
chunkSize = 8000
index = -1
while index < len(array):
chunk = Properties_pb2.ResultRequestChunk()
if index is -1:
chunk.params.CopyFrom(parameters)
print("Added parameters")
index += 1;
else:
actualChunkSize = min(len(array) - index + 1, chunkSize)
chunk.values.CopyFrom(Properties_pb2.ResultArray(values = array[index:index+actualChunkSize]))
print("Added values")
index += actualChunkSize
print(index)
yield chunk
# Final empty message to signal completion
chunk = Properties_pb2.ResultRequestChunk()
yield chunk
print("finished")
def availableProperties(self, caseId, propertyType, porosityModel = 'MATRIX_MODEL'):
propertyTypeEnum = Properties_pb2.PropertyType.Value(propertyType)
porosityModelEnum = GridInfo_pb2.PorosityModelType.Value(porosityModel)
@ -122,6 +151,25 @@ class Properties:
grid_index = gridIndex,
porosity_model = porosityModelEnum)
return self.properties.GetGridResults(request)
def setActiveCellResults(self, values, caseId, propertyType, propertyName, timeStep, gridIndex = 0, porosityModel = 'MATRIX_MODEL'):
propertyTypeEnum = Properties_pb2.PropertyType.Value(propertyType)
porosityModelEnum = GridInfo_pb2.PorosityModelType.Value(porosityModel)
print (propertyName)
request = Properties_pb2.ResultRequest(request_case = CaseInfo_pb2.Case(id=caseId),
property_type = propertyTypeEnum,
property_name = propertyName,
time_step = timeStep,
grid_index = gridIndex,
porosity_model = porosityModelEnum)
try:
request_iterator = self.generateResultRequestArrays(values, request)
print("Starting to send data")
self.properties.SetActiveCellResults(request_iterator)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.NOT_FOUND:
print("Command not found")
else:
print("Other error", e)
class Instance:
@staticmethod
@ -166,6 +214,7 @@ class Instance:
return None
def __init__(self, port = 50051):
logging.basicConfig()
location = "localhost:" + str(port)
self.channel = grpc.insecure_channel(location)

View File

@ -7,7 +7,7 @@ resInsight = ResInsight.Instance.find()
#gridCount = resInsight.gridInfo.getGridCount(caseId=0)
#gridDimensions = resInsight.gridInfo.getAllGridDimensions(caseId=0)
activeCellInfoChunks = resInsight.gridInfo.streamActiveCellInfo(caseId=0)
activeCellInfoChunks = resInsight.gridInfo.cellInfoForActiveCells(caseId=0)
#print("Number of grids: " + str(gridCount))
#print(gridDimensions)

View File

@ -0,0 +1,16 @@
import sys
import os
sys.path.insert(1, os.path.join(sys.path[0], '../api'))
import ResInsight
resInsight = ResInsight.Instance.find()
#gridCount = resInsight.gridInfo.getGridCount(caseId=0)
#gridDimensions = resInsight.gridInfo.getAllGridDimensions(caseId=0)
values = []
for i in range(0, 11124):
values.append(i % 2 * 0.5);
print("Applying all values to time step 0")
resInsight.properties.setActiveCellResults(values, 0, 'DYNAMIC_NATIVE', 'SOIL', 0)