From c30721593eb29681b11adacaf59b11a680054d24 Mon Sep 17 00:00:00 2001 From: Gaute Lindkvist Date: Wed, 22 May 2019 15:30:09 +0200 Subject: [PATCH] #4430 Implement client -> server streaming framework and SetActiveCellProperty --- .../GrpcInterface/GrpcProtos/GridInfo.proto | 10 +- .../GrpcInterface/GrpcProtos/Properties.proto | 18 +-- .../GrpcInterface/RiaGrpcCallbacks.h | 63 +++++++-- .../GrpcInterface/RiaGrpcCallbacks.inl | 126 +++++++++++++++++- .../GrpcInterface/RiaGrpcGridInfoService.cpp | 26 ++-- .../GrpcInterface/RiaGrpcGridInfoService.h | 18 +-- .../RiaGrpcPropertiesService.cpp | 107 ++++++++++++--- .../GrpcInterface/RiaGrpcPropertiesService.h | 9 +- .../GrpcInterface/RiaGrpcServer.cpp | 20 ++- Python/api/ResInsight.py | 57 +++++++- Python/examples/GridInfoStreamingExample.py | 2 +- Python/examples/SetResultValues.py | 16 +++ 12 files changed, 391 insertions(+), 81 deletions(-) create mode 100644 Python/examples/SetResultValues.py diff --git a/ApplicationCode/GrpcInterface/GrpcProtos/GridInfo.proto b/ApplicationCode/GrpcInterface/GrpcProtos/GridInfo.proto index 0baaf00d4b..5344e57617 100644 --- a/ApplicationCode/GrpcInterface/GrpcProtos/GridInfo.proto +++ b/ApplicationCode/GrpcInterface/GrpcProtos/GridInfo.proto @@ -9,7 +9,7 @@ service GridInfo // This function returns a two dimensional matrix: One row for each grid, starting with the main grid. rpc GetGridCount(Case) returns(GridCount) {} rpc GetGridDimensions(Case) returns (GridDimensions) {} - rpc StreamActiveCellInfo(ActiveCellInfoRequest) returns (stream ActiveCellInfoArray) {} + rpc GetCellInfoForActiveCells(CellInfoRequest) returns (stream CellInfoArray) {} rpc GetAllCoarseningInfoArray(Case) returns (CoarseningInfoArray) {} rpc GetTimeSteps(Case) returns (TimeStepDates) {} rpc GetTimeStepDaysSinceStart(Case) returns (DoubleDates) {} @@ -37,18 +37,18 @@ enum PorosityModelType FRACTURE_MODEL = 1; } -message ActiveCellInfoRequest +message CellInfoRequest { int32 case_id = 1; PorosityModelType porosity_model = 2; } -message ActiveCellInfoArray +message CellInfoArray { - repeated ActiveCellInfo data = 1; + repeated CellInfo data = 1; } -message ActiveCellInfo +message CellInfo { int32 grid_index = 1; int32 parent_grid_index = 2; diff --git a/ApplicationCode/GrpcInterface/GrpcProtos/Properties.proto b/ApplicationCode/GrpcInterface/GrpcProtos/Properties.proto index c44909615d..ac91299841 100644 --- a/ApplicationCode/GrpcInterface/GrpcProtos/Properties.proto +++ b/ApplicationCode/GrpcInterface/GrpcProtos/Properties.proto @@ -9,10 +9,10 @@ package rips; service Properties { rpc GetAvailableProperties(PropertiesRequest) returns (AvailableProperties) {} - rpc GetActiveCellResults(ResultRequest) returns (stream ResultReplyArray) {} - rpc GetGridResults(ResultRequest) returns (stream ResultReplyArray) {} - rpc SetActiveCellResults(stream ResultRequestArray) returns (Empty) {} - rpc SetGridResults(stream ResultRequestArray) returns (Empty) {} + rpc GetActiveCellResults(ResultRequest) returns (stream ResultArray) {} + rpc GetGridResults(ResultRequest) returns (stream ResultArray) {} + rpc SetActiveCellResults(stream ResultRequestChunk) returns (Empty) {} + rpc SetGridResults(stream ResultRequestChunk) returns (Empty) {} } enum PropertyType @@ -56,14 +56,14 @@ message TimeStep int32 index = 1; } -message ResultRequestArray +message ResultRequestChunk { - repeated double values = 1; - Case request_case = 2; - TimeStep time_step = 3; + // Params needs to be sent in the first message + ResultRequest params = 1; + ResultArray values = 2; } -message ResultReplyArray +message ResultArray { repeated double values = 1; } diff --git a/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.h b/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.h index cfdfb2f34b..14b9449694 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.h +++ b/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.h @@ -25,6 +25,7 @@ using grpc::CompletionQueue; using grpc::ServerAsyncResponseWriter; +using grpc::ServerAsyncReader; using grpc::ServerAsyncWriter; using grpc::ServerCompletionQueue; using grpc::ServerContext; @@ -44,7 +45,8 @@ public: enum CallState { CREATE_HANDLER, - INIT_REQUEST, + INIT_REQUEST_STARTED, + INIT_REQUEST_COMPLETED, PROCESS_REQUEST, FINISH_REQUEST }; @@ -56,9 +58,10 @@ public: virtual QString name() const = 0; virtual RiaAbstractGrpcCallback* createNewFromThis() const = 0; virtual void createRequestHandler(ServerCompletionQueue* completionQueue) = 0; - virtual void initRequest() = 0; - virtual void processRequest() = 0; - virtual void finishRequest() {} + virtual void onInitRequestStarted() {} + virtual void onInitRequestCompleted() = 0; + virtual void onProcessRequest() = 0; + virtual void onFinishRequest() {} inline CallState callState() const; inline const Status& status() const; @@ -114,8 +117,8 @@ public: RiaAbstractGrpcCallback* createNewFromThis() const override; void createRequestHandler(ServerCompletionQueue* completionQueue) override; - void initRequest() override; - void processRequest() override; + void onInitRequestCompleted() override; + void onProcessRequest() override; protected: virtual QString methodType() const; @@ -133,7 +136,7 @@ private: // // The streaming callback needs a state handler for setting up and maintaining order. // -// A fully functional stream handler needs to implement the following methods: +// A fully functional state handler needs to implement the following methods: // 1. Default Constructor // 2. grpc::Status init(const grpc::Message* request) // @@ -152,8 +155,8 @@ public: RiaAbstractGrpcCallback* createNewFromThis() const override; void createRequestHandler(ServerCompletionQueue* completionQueue) override; - void initRequest() override; - void processRequest() override; + void onInitRequestCompleted() override; + void onProcessRequest() override; protected: virtual QString methodType() const; @@ -166,4 +169,46 @@ private: std::unique_ptr m_stateHandler; }; +//================================================================================================== +// +// Templated client *streaming* gRPC-callback calling service implementation callbacks +// +// The streaming callback needs a state handler for setting up and maintaining order. +// +// A fully functional state handler needs to implement the following methods: +// 1. Default Constructor +// 2. grpc::Status init(const grpc::Message* request) +// 3. void finish() any updates required after completion +// +//================================================================================================== +template +class RiaGrpcClientStreamCallback : public RiaGrpcRequestCallback +{ +public: + typedef ServerAsyncReader RequestReaderT; + typedef std::function MethodImplT; + typedef std::function< + void(ServiceT&, ServerContext*, RequestReaderT*, CompletionQueue*, ServerCompletionQueue*, void*)> + MethodRequestT; + + RiaGrpcClientStreamCallback(ServiceT* service, MethodImplT methodImpl, MethodRequestT methodRequest, StateHandlerT* stateHandler); + + RiaAbstractGrpcCallback* createNewFromThis() const override; + void createRequestHandler(ServerCompletionQueue* completionQueue) override; + void onInitRequestStarted() override; + void onInitRequestCompleted() override; + void onProcessRequest() override; + void onFinishRequest() override; + +protected: + virtual QString methodType() const; + +private: + ServerContext m_context; + RequestReaderT m_reader; + MethodImplT m_methodImpl; + MethodRequestT m_methodRequest; + std::unique_ptr m_stateHandler; +}; + #include "RiaGrpcCallbacks.inl" diff --git a/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.inl b/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.inl index 9a41879e62..e5c47b625e 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.inl +++ b/ApplicationCode/GrpcInterface/RiaGrpcCallbacks.inl @@ -116,24 +116,26 @@ RiaAbstractGrpcCallback* RiaGrpcCallback::createNewF template void RiaGrpcCallback::createRequestHandler(ServerCompletionQueue* completionQueue) { + // The Request-method is where the request gets filled in with data from the gRPC stack: m_methodRequest(*this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this); - this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST); + this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template -void RiaGrpcCallback::initRequest() +void RiaGrpcCallback::onInitRequestCompleted() { this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST); + this->onProcessRequest(); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template -void RiaGrpcCallback::processRequest() +void RiaGrpcCallback::onProcessRequest() { this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply); m_responder.Finish(this->m_reply, this->m_status, this); @@ -183,24 +185,25 @@ void RiaGrpcStreamCallback::createReq ServerCompletionQueue* completionQueue) { m_methodRequest(*this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this); - this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST); + this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED); } //-------------------------------------------------------------------------------------------------- /// Perform initialisation tasks at the time of receiving a request //-------------------------------------------------------------------------------------------------- template -void RiaGrpcStreamCallback::initRequest() +void RiaGrpcStreamCallback::onInitRequestCompleted() { this->m_status = m_stateHandler->init(&this->m_request); this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST); + this->onProcessRequest(); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template -void RiaGrpcStreamCallback::processRequest() +void RiaGrpcStreamCallback::onProcessRequest() { this->m_reply = ReplyT(); // Make sure it is reset @@ -236,3 +239,114 @@ QString RiaGrpcStreamCallback::method { return "StreamingMethod"; } + + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +RiaGrpcClientStreamCallback::RiaGrpcClientStreamCallback(ServiceT* service, + MethodImplT methodImpl, + MethodRequestT methodRequest, + StateHandlerT* stateHandler) + : RiaGrpcRequestCallback(service) + , m_reader(&m_context) + , m_methodImpl(methodImpl) + , m_methodRequest(methodRequest) + , m_stateHandler(stateHandler) +{ +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +RiaAbstractGrpcCallback* RiaGrpcClientStreamCallback::createNewFromThis() const +{ + return new RiaGrpcClientStreamCallback( + this->m_service, m_methodImpl, m_methodRequest, new StateHandlerT); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +void RiaGrpcClientStreamCallback::createRequestHandler( + ServerCompletionQueue* completionQueue) +{ + m_methodRequest(*this->m_service, &m_context, &this->m_reader, completionQueue, completionQueue, this); + this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_STARTED); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +void RiaGrpcClientStreamCallback::onInitRequestStarted() +{ + this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED); + m_reader.Read(&m_request, this); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +void RiaGrpcClientStreamCallback::onInitRequestCompleted() +{ + this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST); + this->m_status = m_stateHandler->init(&this->m_request); // Fully received the stream package so can now init + m_reader.Read(&m_request, this); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +void RiaGrpcClientStreamCallback::onProcessRequest() +{ + this->m_reply = ReplyT(); // Make sure it is reset + + if (!this->m_status.ok()) + { + m_reader.Finish(this->m_reply, this->m_status, this); + this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST); + return; + } + + this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply, m_stateHandler.get()); + if (!this->m_status.ok()) + { + this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST); + if (this->m_status.error_code() == grpc::OUT_OF_RANGE) + { + m_reader.Finish(this->m_reply, grpc::Status::OK, this); + } + else + { + m_reader.FinishWithError(this->m_status, this); + } + } + else + { + m_reader.Read(&this->m_request, this); + } +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +void RiaGrpcClientStreamCallback::onFinishRequest() +{ + m_stateHandler->finish(); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +template +QString RiaGrpcClientStreamCallback::methodType() const +{ + return "ClientStreamingMethod"; +} diff --git a/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.cpp b/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.cpp index 9fe8d03472..e6c17815a6 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.cpp +++ b/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.cpp @@ -43,7 +43,7 @@ RiaActiveCellInfoStateHandler::RiaActiveCellInfoStateHandler() //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -grpc::Status RiaActiveCellInfoStateHandler::init(const rips::ActiveCellInfoRequest* request) +grpc::Status RiaActiveCellInfoStateHandler::init(const rips::CellInfoRequest* request) { CAF_ASSERT(request); m_request = request; @@ -87,7 +87,7 @@ grpc::Status RiaActiveCellInfoStateHandler::init(const rips::ActiveCellInfoReque //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::ActiveCellInfo* cellInfo) +grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::CellInfo* cellInfo) { const std::vector& reservoirCells = m_eclipseCase->eclipseCaseData()->mainGrid()->globalCellArray(); @@ -96,7 +96,7 @@ grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::A size_t cellIdxToTry = m_currentCellIdx++; if (m_activeCellInfo->isActive(cellIdxToTry)) { - assignActiveCellInfoData(cellInfo, reservoirCells, cellIdxToTry); + assignCellInfoData(cellInfo, reservoirCells, cellIdxToTry); return grpc::Status::OK; } } @@ -106,7 +106,7 @@ grpc::Status RiaActiveCellInfoStateHandler::assignNextActiveCellInfoData(rips::A //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -void RiaActiveCellInfoStateHandler::assignActiveCellInfoData(rips::ActiveCellInfo* cellInfo, +void RiaActiveCellInfoStateHandler::assignCellInfoData(rips::CellInfo* cellInfo, const std::vector& reservoirCells, size_t cellIdx) { @@ -184,18 +184,18 @@ const std::vector& RiaActiveCellInfoStateHandler::reservoirCells() cons //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -grpc::Status RiaActiveCellInfoStateHandler::assignReply(rips::ActiveCellInfoArray* reply) +grpc::Status RiaActiveCellInfoStateHandler::assignReply(rips::CellInfoArray* reply) { - const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::ActiveCellInfoArray)); + const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::CellInfoArray)); size_t packageIndex = 0u; reply->mutable_data()->Reserve((int)packageSize); for (; packageIndex < packageSize && m_currentCellIdx < m_activeCellInfo->reservoirCellCount(); ++packageIndex) { - rips::ActiveCellInfo singleCellInfo; + rips::CellInfo singleCellInfo; grpc::Status singleCellInfoStatus = assignNextActiveCellInfoData(&singleCellInfo); if (singleCellInfoStatus.ok()) { - rips::ActiveCellInfo* allocCellInfo = reply->add_data(); + rips::CellInfo* allocCellInfo = reply->add_data(); *allocCellInfo = singleCellInfo; } else @@ -258,9 +258,9 @@ grpc::Status RiaGrpcGridInfoService::GetGridDimensions(grpc::ServerContext* //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- -grpc::Status RiaGrpcGridInfoService::StreamActiveCellInfo(grpc::ServerContext* context, - const rips::ActiveCellInfoRequest* request, - rips::ActiveCellInfoArray* reply, +grpc::Status RiaGrpcGridInfoService::GetCellInfoForActiveCells(grpc::ServerContext* context, + const rips::CellInfoRequest* request, + rips::CellInfoArray* reply, RiaActiveCellInfoStateHandler* stateHandler) { return stateHandler->assignReply(reply); @@ -275,8 +275,8 @@ std::vector RiaGrpcGridInfoService::createCallbacks() return {new RiaGrpcCallback(this, &Self::GetGridCount, &Self::RequestGetGridCount), new RiaGrpcCallback(this, &Self::GetGridDimensions, &Self::RequestGetGridDimensions), - new RiaGrpcStreamCallback( - this, &Self::StreamActiveCellInfo, &Self::RequestStreamActiveCellInfo, new RiaActiveCellInfoStateHandler)}; + new RiaGrpcStreamCallback( + this, &Self::GetCellInfoForActiveCells, &Self::RequestGetCellInfoForActiveCells, new RiaActiveCellInfoStateHandler)}; } static bool RiaGrpcGridInfoService_init = diff --git a/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.h b/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.h index b73c724977..b08bdd8d93 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.h +++ b/ApplicationCode/GrpcInterface/RiaGrpcGridInfoService.h @@ -45,15 +45,15 @@ class RiaActiveCellInfoStateHandler public: RiaActiveCellInfoStateHandler(); - Status init(const rips::ActiveCellInfoRequest* request); - Status assignNextActiveCellInfoData(rips::ActiveCellInfo* cellInfo); - void assignActiveCellInfoData(rips::ActiveCellInfo* cellInfo, const std::vector& reservoirCells, size_t cellIdx); - Status assignReply(rips::ActiveCellInfoArray* reply); + Status init(const rips::CellInfoRequest* request); + Status assignNextActiveCellInfoData(rips::CellInfo* cellInfo); + void assignCellInfoData(rips::CellInfo* cellInfo, const std::vector& reservoirCells, size_t cellIdx); + Status assignReply(rips::CellInfoArray* reply); RigActiveCellInfo* activeCellInfo() const; const std::vector& reservoirCells() const; protected: - const rips::ActiveCellInfoRequest* m_request; + const rips::CellInfoRequest* m_request; RimEclipseCase* m_eclipseCase; RiaDefines::PorosityModelType m_porosityModel; RigActiveCellInfo* m_activeCellInfo; @@ -71,9 +71,9 @@ class RiaGrpcGridInfoService final : public rips::GridInfo::AsyncService, public public: grpc::Status GetGridCount(grpc::ServerContext* context, const rips::Case* request, rips::GridCount* reply) override; grpc::Status GetGridDimensions(grpc::ServerContext* context, const rips::Case* request, rips::GridDimensions* reply) override; - grpc::Status StreamActiveCellInfo(grpc::ServerContext* context, - const rips::ActiveCellInfoRequest* request, - rips::ActiveCellInfoArray* reply, - RiaActiveCellInfoStateHandler* stateHandler); + grpc::Status GetCellInfoForActiveCells(grpc::ServerContext* context, + const rips::CellInfoRequest* request, + rips::CellInfoArray* reply, + RiaActiveCellInfoStateHandler* stateHandler); std::vector createCallbacks() override; }; diff --git a/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.cpp b/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.cpp index 9d482965a8..8d3bcf06b4 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.cpp +++ b/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.cpp @@ -29,7 +29,10 @@ #include "RigMainGrid.h" #include "RigResultAccessor.h" #include "RigResultAccessorFactory.h" +#include "RigResultModifier.h" +#include "RigResultModifierFactory.h" +#include "Rim3dView.h" #include "RimEclipseCase.h" using namespace rips; @@ -58,12 +61,13 @@ public: //-------------------------------------------------------------------------------------------------- Status init(const ResultRequest* request) { - int caseId = request->request_case().id(); - RimEclipseCase* eclipseCase = dynamic_cast(RiaGrpcServiceInterface::findCase(caseId)); - if (eclipseCase) + int caseId = request->request_case().id(); + m_eclipseCase = dynamic_cast(RiaGrpcServiceInterface::findCase(caseId)); + + if (m_eclipseCase) { auto porosityModel = static_cast(request->porosity_model()); - auto caseData = eclipseCase->eclipseCaseData(); + auto caseData = m_eclipseCase->eclipseCaseData(); auto resultData = caseData->results(porosityModel); auto resultType = static_cast(request->property_type()); size_t timeStep = static_cast(request->time_step()); @@ -83,12 +87,24 @@ public: return grpc::Status(grpc::NOT_FOUND, "Couldn't find an Eclipse case matching the case Id"); } + //-------------------------------------------------------------------------------------------------- + /// Client streamers need to be initialised with the encapsulated parameters + //-------------------------------------------------------------------------------------------------- + Status init(const ResultRequestChunk* request) + { + if (request->has_params()) + { + return init(&(request->params())); + } + return grpc::Status(grpc::INVALID_ARGUMENT, "Need to have ResultRequest parameters in first message"); + } + //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- - Status assignReply(ResultReplyArray* reply) + Status assignStreamReply(ResultArray* reply) { - const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::ResultReplyArray)); + const size_t packageSize = RiaGrpcServiceInterface::numberOfMessagesForByteCount(sizeof(rips::ResultArray)); size_t packageIndex = 0u; reply->mutable_values()->Reserve((int)packageSize); for (; packageIndex < packageSize && m_currentCellIdx < m_cellCount; ++packageIndex, ++m_currentCellIdx) @@ -103,6 +119,40 @@ public: "We've reached the end. This is not an error but means transmission is finished"); } + //-------------------------------------------------------------------------------------------------- + /// + //-------------------------------------------------------------------------------------------------- + Status receiveStreamRequest(const ResultRequestChunk* request) + { + if (request->has_values()) + { + auto values = request->values().values(); + for (int i = 0; i < values.size() && m_currentCellIdx < m_cellCount; ++i, ++m_currentCellIdx) + { + setCellResult(m_currentCellIdx, values[i]); + } + if (m_currentCellIdx == m_cellCount - 1) + { + return grpc::Status(grpc::OUT_OF_RANGE, "All values have been written"); + } + + return Status::OK; + } + return grpc::Status(grpc::OUT_OF_RANGE, "No messages to write"); + } + + //-------------------------------------------------------------------------------------------------- + /// + //-------------------------------------------------------------------------------------------------- + void finish() + { + for (Rim3dView* view : m_eclipseCase->views()) + { + view->setCurrentTimeStepAndUpdate(view->currentTimeStep()); + view->createDisplayModelAndRedraw(); + } + } + protected: virtual void initResultAccess(RigEclipseCaseData* caseData, size_t gridIndex, @@ -110,9 +160,11 @@ protected: size_t timeStepIndex, RigEclipseResultAddress resVarAddr) = 0; virtual double cellResult(size_t currentCellIndex) const = 0; + virtual void setCellResult(size_t currentCellIndex, double value) = 0; protected: const rips::ResultRequest* m_request; + RimEclipseCase* m_eclipseCase; size_t m_currentCellIdx; size_t m_cellCount; }; @@ -127,7 +179,7 @@ protected: RigEclipseResultAddress resVarAddr) override { auto activeCellInfo = caseData->activeCellInfo(porosityModel); - m_resultValues = &(caseData->results(porosityModel)->cellScalarResults(resVarAddr, timeStepIndex)); + m_resultValues = &(caseData->results(porosityModel)->modifiableCellScalarResult(resVarAddr, timeStepIndex)); m_cellCount = activeCellInfo->reservoirActiveCellCount(); } @@ -136,8 +188,13 @@ protected: return (*m_resultValues)[currentCellIndex]; } + void setCellResult(size_t currentCellIndex, double value) override + { + (*m_resultValues)[currentCellIndex] = value; + } + private: - const std::vector* m_resultValues; + std::vector* m_resultValues; }; class RiaGridCellResultsStateHandler : public RiaCellResultsStateHandler @@ -150,6 +207,7 @@ protected: RigEclipseResultAddress resVarAddr) override { m_resultAccessor = RigResultAccessorFactory::createFromResultAddress(caseData, gridIndex, porosityModel, timeStepIndex, resVarAddr); + m_resultModifier = RigResultModifierFactory::createResultModifier(caseData, gridIndex, porosityModel, timeStepIndex, resVarAddr); m_cellCount = caseData->grid(gridIndex)->cellCount(); } @@ -158,8 +216,14 @@ protected: return m_resultAccessor->cellScalar(currentCellIndex); } + void setCellResult(size_t currentCellIndex, double value) override + { + return m_resultModifier->setCellScalar(currentCellIndex, value); + } + private: cvf::ref m_resultAccessor; + cvf::ref m_resultModifier; }; //-------------------------------------------------------------------------------------------------- @@ -195,10 +259,10 @@ grpc::Status RiaGrpcPropertiesService::GetAvailableProperties(grpc::ServerContex //-------------------------------------------------------------------------------------------------- grpc::Status RiaGrpcPropertiesService::GetActiveCellResults(grpc::ServerContext* context, const ResultRequest* request, - ResultReplyArray* reply, + ResultArray* reply, RiaActiveCellResultsStateHandler* stateHandler) { - return stateHandler->assignReply(reply); + return stateHandler->assignStreamReply(reply); } @@ -207,10 +271,21 @@ grpc::Status RiaGrpcPropertiesService::GetActiveCellResults(grpc::ServerContext* //-------------------------------------------------------------------------------------------------- grpc::Status RiaGrpcPropertiesService::GetGridResults(grpc::ServerContext* context, const rips::ResultRequest* request, - rips::ResultReplyArray* reply, + rips::ResultArray* reply, RiaGridCellResultsStateHandler* stateHandler) { - return stateHandler->assignReply(reply); + return stateHandler->assignStreamReply(reply); +} + +//-------------------------------------------------------------------------------------------------- +/// +//-------------------------------------------------------------------------------------------------- +grpc::Status RiaGrpcPropertiesService::SetActiveCellResults(grpc::ServerContext* context, + const rips::ResultRequestChunk* request, + rips::Empty* reply, + RiaActiveCellResultsStateHandler* stateHandler) +{ + return stateHandler->receiveStreamRequest(request); } //-------------------------------------------------------------------------------------------------- @@ -222,10 +297,12 @@ std::vector RiaGrpcPropertiesService::createCallbacks( return {new RiaGrpcCallback( this, &Self::GetAvailableProperties, &Self::RequestGetAvailableProperties), - new RiaGrpcStreamCallback( + new RiaGrpcStreamCallback( this, &Self::GetActiveCellResults, &Self::RequestGetActiveCellResults, new RiaActiveCellResultsStateHandler), - new RiaGrpcStreamCallback( - this, &Self::GetGridResults, &Self::RequestGetGridResults, new RiaGridCellResultsStateHandler) + new RiaGrpcStreamCallback( + this, &Self::GetGridResults, &Self::RequestGetGridResults, new RiaGridCellResultsStateHandler), + new RiaGrpcClientStreamCallback( + this, &Self::SetActiveCellResults, &Self::RequestSetActiveCellResults, new RiaActiveCellResultsStateHandler) }; } diff --git a/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.h b/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.h index 64d4ed6306..c62d83f2d3 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.h +++ b/ApplicationCode/GrpcInterface/RiaGrpcPropertiesService.h @@ -39,13 +39,16 @@ public: rips::AvailableProperties* reply) override; grpc::Status GetActiveCellResults(grpc::ServerContext* context, const rips::ResultRequest* request, - rips::ResultReplyArray* reply, + rips::ResultArray* reply, RiaActiveCellResultsStateHandler* stateHandler); grpc::Status GetGridResults(grpc::ServerContext* context, const rips::ResultRequest* request, - rips::ResultReplyArray* reply, + rips::ResultArray* reply, RiaGridCellResultsStateHandler* stateHandler); - + grpc::Status SetActiveCellResults(grpc::ServerContext* context, + const rips::ResultRequestChunk* request, + rips::Empty* reply, + RiaActiveCellResultsStateHandler* stateHandler); std::vector createCallbacks() override; }; diff --git a/ApplicationCode/GrpcInterface/RiaGrpcServer.cpp b/ApplicationCode/GrpcInterface/RiaGrpcServer.cpp index 128c6c81c5..8acbe4a04b 100644 --- a/ApplicationCode/GrpcInterface/RiaGrpcServer.cpp +++ b/ApplicationCode/GrpcInterface/RiaGrpcServer.cpp @@ -63,7 +63,7 @@ public: void initialize(); void processRequests(); void quit(); - int currentPortNumber; + int currentPortNumber; private: void waitForNextRequest(); @@ -242,22 +242,28 @@ void RiaGrpcServerImpl::process(RiaAbstractGrpcCallback* method) RiaLogging::debug(QString("Initialising request handler for: %1").arg(method->name())); method->createRequestHandler(m_completionQueue.get()); } - else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST) + else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST_STARTED) { // Perform initialization and immediately process the first request // The initialization is necessary for streaming services. - RiaLogging::info(QString("Starting handling: %1").arg(method->name())); - method->initRequest(); - method->processRequest(); + method->onInitRequestStarted(); + + } + else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED) + { + // Perform initialization and immediately process the first request + // The initialization is necessary for streaming services. + RiaLogging::info(QString("Initialising handling: %1").arg(method->name())); + method->onInitRequestCompleted(); } else if (method->callState() == RiaAbstractGrpcCallback::PROCESS_REQUEST) { - method->processRequest(); + method->onProcessRequest(); } else { RiaLogging::info(QString("Finished handling: %1").arg(method->name())); - method->finishRequest(); + method->onFinishRequest(); process(method->createNewFromThis()); delete method; } diff --git a/Python/api/ResInsight.py b/Python/api/ResInsight.py index 783457839f..e3b1d99784 100644 --- a/Python/api/ResInsight.py +++ b/Python/api/ResInsight.py @@ -4,6 +4,8 @@ import grpc import os import sys import socket +import logging + sys.path.insert(1, os.path.join(sys.path[0], '../generated')) @@ -68,14 +70,14 @@ class GridInfo: def __init__(self, channel): self.gridInfo = GridInfo_pb2_grpc.GridInfoStub(channel) - def getGridCount(self, caseId=0): + def gridCount(self, caseId=0): return self.gridInfo.GetGridCount(CaseInfo_pb2.Case(id=caseId)).count - def getGridDimensions(self, caseId=0): + def gridDimensions(self, caseId=0): return self.gridInfo.GetGridDimensions(CaseInfo_pb2.Case(id=caseId)).dimensions - def streamActiveCellInfo(self, caseId=0): - return self.gridInfo.StreamActiveCellInfo(CaseInfo_pb2.Case(id=caseId)) + def cellInfoForActiveCells(self, caseId=0): + return self.gridInfo.GetCellInfoForActiveCells(CaseInfo_pb2.Case(id=caseId)) class ProjectInfo: def __init__(self, channel): @@ -96,6 +98,33 @@ class ProjectInfo: class Properties: def __init__(self, channel): self.properties = Properties_pb2_grpc.PropertiesStub(channel) + + def generateResultRequestArrays(self, array, parameters): + # Each double is 8 bytes. A good chunk size is 64KiB = 65536B + # Meaning ideal number of doubles would be 8192. + # However we need overhead space, so if we choose 8160 in chunk size + # We have 256B left for overhead which should be plenty + chunkSize = 8000 + index = -1 + while index < len(array): + chunk = Properties_pb2.ResultRequestChunk() + if index is -1: + chunk.params.CopyFrom(parameters) + print("Added parameters") + index += 1; + else: + actualChunkSize = min(len(array) - index + 1, chunkSize) + chunk.values.CopyFrom(Properties_pb2.ResultArray(values = array[index:index+actualChunkSize])) + print("Added values") + index += actualChunkSize + + print(index) + yield chunk + # Final empty message to signal completion + chunk = Properties_pb2.ResultRequestChunk() + yield chunk + print("finished") + def availableProperties(self, caseId, propertyType, porosityModel = 'MATRIX_MODEL'): propertyTypeEnum = Properties_pb2.PropertyType.Value(propertyType) porosityModelEnum = GridInfo_pb2.PorosityModelType.Value(porosityModel) @@ -122,6 +151,25 @@ class Properties: grid_index = gridIndex, porosity_model = porosityModelEnum) return self.properties.GetGridResults(request) + def setActiveCellResults(self, values, caseId, propertyType, propertyName, timeStep, gridIndex = 0, porosityModel = 'MATRIX_MODEL'): + propertyTypeEnum = Properties_pb2.PropertyType.Value(propertyType) + porosityModelEnum = GridInfo_pb2.PorosityModelType.Value(porosityModel) + print (propertyName) + request = Properties_pb2.ResultRequest(request_case = CaseInfo_pb2.Case(id=caseId), + property_type = propertyTypeEnum, + property_name = propertyName, + time_step = timeStep, + grid_index = gridIndex, + porosity_model = porosityModelEnum) + try: + request_iterator = self.generateResultRequestArrays(values, request) + print("Starting to send data") + self.properties.SetActiveCellResults(request_iterator) + except grpc.RpcError as e: + if e.code() == grpc.StatusCode.NOT_FOUND: + print("Command not found") + else: + print("Other error", e) class Instance: @staticmethod @@ -166,6 +214,7 @@ class Instance: return None def __init__(self, port = 50051): + logging.basicConfig() location = "localhost:" + str(port) self.channel = grpc.insecure_channel(location) diff --git a/Python/examples/GridInfoStreamingExample.py b/Python/examples/GridInfoStreamingExample.py index 999dc7f25c..4a9c753d0a 100644 --- a/Python/examples/GridInfoStreamingExample.py +++ b/Python/examples/GridInfoStreamingExample.py @@ -7,7 +7,7 @@ resInsight = ResInsight.Instance.find() #gridCount = resInsight.gridInfo.getGridCount(caseId=0) #gridDimensions = resInsight.gridInfo.getAllGridDimensions(caseId=0) -activeCellInfoChunks = resInsight.gridInfo.streamActiveCellInfo(caseId=0) +activeCellInfoChunks = resInsight.gridInfo.cellInfoForActiveCells(caseId=0) #print("Number of grids: " + str(gridCount)) #print(gridDimensions) diff --git a/Python/examples/SetResultValues.py b/Python/examples/SetResultValues.py new file mode 100644 index 0000000000..a9ec77698f --- /dev/null +++ b/Python/examples/SetResultValues.py @@ -0,0 +1,16 @@ +import sys +import os +sys.path.insert(1, os.path.join(sys.path[0], '../api')) +import ResInsight + +resInsight = ResInsight.Instance.find() +#gridCount = resInsight.gridInfo.getGridCount(caseId=0) +#gridDimensions = resInsight.gridInfo.getAllGridDimensions(caseId=0) + +values = [] +for i in range(0, 11124): + values.append(i % 2 * 0.5); + +print("Applying all values to time step 0") +resInsight.properties.setActiveCellResults(values, 0, 'DYNAMIC_NATIVE', 'SOIL', 0) +