Python: enable storage of new results and improve reliability

This commit is contained in:
Gaute Lindkvist 2019-05-30 18:52:16 +02:00
parent 3d109da5b2
commit a6cdec0816
5 changed files with 45 additions and 17 deletions

View File

@ -65,8 +65,6 @@ public:
inline CallState callState() const;
inline const Status& status() const;
protected:
inline void setNextCallState(CallState state);
protected:

View File

@ -272,7 +272,7 @@ template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHa
RiaGrpcCallbackInterface* RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::createNewFromThis() const
{
return new RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>(
this->m_service, m_methodImpl, m_methodRequest, new StateHandlerT);
this->m_service, m_methodImpl, m_methodRequest, new StateHandlerT(true));
}
//--------------------------------------------------------------------------------------------------

View File

@ -218,11 +218,10 @@ grpc::Status RiaGrpcGridInfoService::GetGridCount(grpc::ServerContext* context,
RimCase* rimCase = findCase(request->id());
RimEclipseCase* eclipseCase = dynamic_cast<RimEclipseCase*>(rimCase);
size_t gridCount = 0u;
if (eclipseCase)
{
gridCount = eclipseCase->mainGrid()->gridCount();
reply->set_count((int)gridCount);
int gridCount = (int) eclipseCase->mainGrid()->gridCount();
reply->set_count(gridCount);
return Status::OK;
}
return grpc::Status(grpc::NOT_FOUND, "Eclipse Case not found");

View File

@ -26,6 +26,7 @@
#include "RigCaseCellResultsData.h"
#include "RigEclipseCaseData.h"
#include "RigEclipseResultAddress.h"
#include "RigEclipseResultInfo.h"
#include "RigMainGrid.h"
#include "RigResultAccessor.h"
#include "RigResultAccessorFactory.h"
@ -50,9 +51,10 @@ public:
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
RiaCellResultsStateHandler()
RiaCellResultsStateHandler(bool clientStreamer = false)
: m_request(nullptr)
, m_currentCellIdx(0u)
, m_clientStreamer(clientStreamer)
{
}
@ -82,6 +84,24 @@ public:
}
return grpc::Status(grpc::NOT_FOUND, "No such time step");
}
else if (m_clientStreamer)
{
resultData->createResultEntry(resAddr, true);
RigEclipseResultAddress addrToMaxTimeStepCountResult;
size_t timeStepCount = resultData->maxTimeStepCount(&addrToMaxTimeStepCountResult);
const std::vector<RigEclipseTimeStepInfo> timeStepInfos =
resultData->timeStepInfos(addrToMaxTimeStepCountResult);
resultData->setTimeStepInfos(resAddr, timeStepInfos);
auto scalarResultFrames = resultData->modifiableCellScalarResultTimesteps(resAddr);
scalarResultFrames.resize(timeStepCount);
if (timeStep < resultData->timeStepCount(resAddr))
{
initResultAccess(caseData, request->grid_index(), porosityModel, timeStep, resAddr);
return grpc::Status::OK;
}
return grpc::Status(grpc::NOT_FOUND, "No such time step");
}
return grpc::Status(grpc::NOT_FOUND, "No such result");
}
return grpc::Status(grpc::NOT_FOUND, "Couldn't find an Eclipse case matching the case Id");
@ -131,7 +151,7 @@ public:
{
setCellResult(m_currentCellIdx, values[i]);
}
if (m_currentCellIdx == m_cellCount - 1)
if (m_currentCellIdx >= m_cellCount - 1)
{
return grpc::Status(grpc::OUT_OF_RANGE, "All values have been written");
}
@ -167,10 +187,15 @@ protected:
RimEclipseCase* m_eclipseCase;
size_t m_currentCellIdx;
size_t m_cellCount;
bool m_clientStreamer;
};
class RiaActiveCellResultsStateHandler : public RiaCellResultsStateHandler
{
public:
RiaActiveCellResultsStateHandler(bool clientStreamer = false)
: RiaCellResultsStateHandler(clientStreamer)
{}
protected:
void initResultAccess(RigEclipseCaseData* caseData,
size_t gridIndex,
@ -180,6 +205,10 @@ protected:
{
auto activeCellInfo = caseData->activeCellInfo(porosityModel);
m_resultValues = &(caseData->results(porosityModel)->modifiableCellScalarResult(resVarAddr, timeStepIndex));
if (m_resultValues->empty())
{
m_resultValues->resize(activeCellInfo->reservoirCellResultCount());
}
m_cellCount = activeCellInfo->reservoirActiveCellCount();
}
@ -199,6 +228,11 @@ private:
class RiaGridCellResultsStateHandler : public RiaCellResultsStateHandler
{
public:
RiaGridCellResultsStateHandler(bool clientStreamer = false)
: RiaCellResultsStateHandler(clientStreamer)
{}
protected:
void initResultAccess(RigEclipseCaseData* caseData,
size_t gridIndex,
@ -313,9 +347,9 @@ std::vector<RiaGrpcCallbackInterface*> RiaGrpcPropertiesService::createCallbacks
new RiaGrpcServerStreamCallback<Self, ResultRequest, ResultArray, RiaGridCellResultsStateHandler>(
this, &Self::GetGridResults, &Self::RequestGetGridResults, new RiaGridCellResultsStateHandler),
new RiaGrpcClientStreamCallback<Self, ResultRequestChunk, Empty, RiaActiveCellResultsStateHandler>(
this, &Self::SetActiveCellResults, &Self::RequestSetActiveCellResults, new RiaActiveCellResultsStateHandler),
this, &Self::SetActiveCellResults, &Self::RequestSetActiveCellResults, new RiaActiveCellResultsStateHandler(true)),
new RiaGrpcClientStreamCallback<Self, ResultRequestChunk, Empty, RiaGridCellResultsStateHandler>(
this, &Self::SetGridResults, &Self::RequestSetGridResults, new RiaGridCellResultsStateHandler)
this, &Self::SetGridResults, &Self::RequestSetGridResults, new RiaGridCellResultsStateHandler(true))
};
}

View File

@ -229,12 +229,13 @@ void RiaGrpcServerImpl::waitForNextRequest()
while (m_completionQueue->Next(&tag, &ok))
{
std::lock_guard<std::mutex> requestLock(m_requestMutex);
RiaGrpcCallbackInterface* method = static_cast<RiaGrpcCallbackInterface*>(tag);
if (ok)
if (!ok)
{
std::lock_guard<std::mutex> requestLock(m_requestMutex);
m_unprocessedRequests.push_back(method);
method->setNextCallState(RiaGrpcCallbackInterface::FINISH_REQUEST);
}
m_unprocessedRequests.push_back(method);
}
}
@ -247,17 +248,14 @@ void RiaGrpcServerImpl::process(RiaGrpcCallbackInterface* method)
{
if (method->callState() == RiaGrpcCallbackInterface::CREATE_HANDLER)
{
RiaLogging::debug(QString("Creating request handler for: %1").arg(method->name()));
method->createRequestHandler(m_completionQueue.get());
}
else if (method->callState() == RiaGrpcCallbackInterface::INIT_REQUEST_STARTED)
{
method->onInitRequestStarted();
}
else if (method->callState() == RiaGrpcCallbackInterface::INIT_REQUEST_COMPLETED)
{
RiaLogging::info(QString("Initialising handling: %1").arg(method->name()));
method->onInitRequestCompleted();
}
else if (method->callState() == RiaGrpcCallbackInterface::PROCESS_REQUEST)
@ -266,7 +264,6 @@ void RiaGrpcServerImpl::process(RiaGrpcCallbackInterface* method)
}
else
{
RiaLogging::info(QString("Finished handling: %1").arg(method->name()));
method->onFinishRequest();
process(method->createNewFromThis());
delete method;