gRPC: Fix up comments and clean up callback code

This commit is contained in:
Gaute Lindkvist 2019-05-23 09:58:04 +02:00
parent 0cb2194bfb
commit 6ad0c8fbe1
5 changed files with 79 additions and 61 deletions

View File

@ -1681,7 +1681,7 @@ void RiaGuiApplication::runIdleProcessing()
#ifdef ENABLE_GRPC
if (!caf::ProgressInfoStatic::isRunning())
{
m_grpcServer->processRequests();
m_grpcServer->processAllQueuedRequests();
}
#endif
}

View File

@ -59,7 +59,7 @@ public:
virtual RiaAbstractGrpcCallback* createNewFromThis() const = 0;
virtual void createRequestHandler(ServerCompletionQueue* completionQueue) = 0;
virtual void onInitRequestStarted() {}
virtual void onInitRequestCompleted() = 0;
virtual void onInitRequestCompleted() {}
virtual void onProcessRequest() = 0;
virtual void onFinishRequest() {}
@ -67,7 +67,7 @@ public:
inline const Status& status() const;
protected:
inline void setCallState(CallState state);
inline void setNextCallState(CallState state);
protected:
CallState m_state;
@ -117,7 +117,6 @@ public:
RiaAbstractGrpcCallback* createNewFromThis() const override;
void createRequestHandler(ServerCompletionQueue* completionQueue) override;
void onInitRequestCompleted() override;
void onProcessRequest() override;
protected:
@ -132,11 +131,11 @@ private:
//==================================================================================================
//
// Templated server *streaming* gRPC-callback calling service implementation callbacks
// Templated server->client *streaming* gRPC-callback calling service implementation callbacks
//
// The streaming callback needs a state handler for setting up and maintaining order.
//
// A fully functional state handler needs to implement the following methods:
// A fully functional state handler for server->client streaming needs to implement the following methods:
// 1. Default Constructor
// 2. grpc::Status init(const grpc::Message* request)
//
@ -171,11 +170,11 @@ private:
//==================================================================================================
//
// Templated client *streaming* gRPC-callback calling service implementation callbacks
// Templated client->server *streaming* gRPC-callback calling service implementation callbacks
//
// The streaming callback needs a state handler for setting up and maintaining order.
//
// A fully functional state handler needs to implement the following methods:
// A fully functional state handler for client->server streaming needs to implement the following methods:
// 1. Default Constructor
// 2. grpc::Status init(const grpc::Message* request)
// 3. void finish() any updates required after completion

View File

@ -41,7 +41,7 @@ const Status& RiaAbstractGrpcCallback::status() const
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
inline void RiaAbstractGrpcCallback::setCallState(CallState state)
inline void RiaAbstractGrpcCallback::setNextCallState(CallState state)
{
m_state = state;
}
@ -116,19 +116,10 @@ RiaAbstractGrpcCallback* RiaGrpcCallback<ServiceT, RequestT, ReplyT>::createNewF
template<typename ServiceT, typename RequestT, typename ReplyT>
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::createRequestHandler(ServerCompletionQueue* completionQueue)
{
// The Request-method is where the request gets filled in with data from the gRPC stack:
// The Request-method is where the service gets registered to respond to a given request.
m_methodRequest(*this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
}
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT>
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::onInitRequestCompleted()
{
this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
this->onProcessRequest();
// Simple unary requests don't need initialisation, so proceed to process as soon as a request turns up.
this->setNextCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
}
//--------------------------------------------------------------------------------------------------
@ -137,9 +128,13 @@ void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::onInitRequestCompleted()
template<typename ServiceT, typename RequestT, typename ReplyT>
void RiaGrpcCallback<ServiceT, RequestT, ReplyT>::onProcessRequest()
{
// Call request handler method
this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply);
// Simply unary requests are finished as soon as you've done any processing.
// So next time we receive a new tag on the command queue we should proceed to finish.
this->setNextCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
// Finish will push this callback back on the command queue (now with Finish as the call state).
m_responder.Finish(this->m_reply, this->m_status, this);
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
}
//--------------------------------------------------------------------------------------------------
@ -184,49 +179,62 @@ template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHa
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::createRequestHandler(
ServerCompletionQueue* completionQueue)
{
// The Request-method is where the service gets registered to respond to a given request.
m_methodRequest(*this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
// Server->Client Streaming requests require initialisation. However, we receive the complete request immediately.
// So can proceed directly to completion of the init request.
this->setNextCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
}
//--------------------------------------------------------------------------------------------------
/// Perform initialisation tasks at the time of receiving a request
/// Perform initialisation tasks at the time of receiving a complete request
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onInitRequestCompleted()
{
// Initialise streaming state handler
this->m_status = m_stateHandler->init(&this->m_request);
this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
if (!this->m_status.ok())
{
// We have an error. Proceed to finish and report the status
this->setNextCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
m_responder.Finish(this->m_status, this);
return;
}
// Move on to processing and perform the first processing immediately since the client will
// not request anything more.
this->setNextCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
this->onProcessRequest();
}
//--------------------------------------------------------------------------------------------------
///
/// Process a streaming request and send one package
//--------------------------------------------------------------------------------------------------
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onProcessRequest()
{
this->m_reply = ReplyT(); // Make sure it is reset
if (!this->m_status.ok())
{
m_responder.Finish(this->m_status, this);
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
return;
}
// Call request handler method
this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply, m_stateHandler.get());
if (this->m_status.ok())
{
// The write call will send data to client AND put this callback back on the command queue
// so that this method gets called again to send the next stream package.
m_responder.Write(this->m_reply, this);
}
else
{
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
this->setNextCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
// Out of range means we're finished but it isn't an error
if (this->m_status.error_code() == grpc::OUT_OF_RANGE)
{
this->m_status = Status::OK;
}
// Finish will put this callback back on the command queue, now with a finish state.
m_responder.Finish(this->m_status, this);
}
}
@ -274,8 +282,11 @@ template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHa
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::createRequestHandler(
ServerCompletionQueue* completionQueue)
{
// The Request-method is where the service gets registered to respond to a given request.
m_methodRequest(*this->m_service, &m_context, &this->m_reader, completionQueue, completionQueue, this);
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_STARTED);
// The client->server streaming requires initialisation and each request package is streamed asynchronously
// So we need to start and complete the init request.
this->setNextCallState(RiaAbstractGrpcCallback::INIT_REQUEST_STARTED);
}
//--------------------------------------------------------------------------------------------------
@ -284,7 +295,9 @@ void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::cre
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onInitRequestStarted()
{
this->setCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
this->setNextCallState(RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED);
// The read call will start reading the request data and push this callback back onto the command queue
// when the read call is completed.
m_reader.Read(&m_request, this);
}
@ -294,8 +307,19 @@ void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onI
template<typename ServiceT, typename RequestT, typename ReplyT, typename StateHandlerT>
void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onInitRequestCompleted()
{
this->setCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
this->m_status = m_stateHandler->init(&this->m_request); // Fully received the stream package so can now init
this->setNextCallState(RiaAbstractGrpcCallback::PROCESS_REQUEST);
// Fully received the stream package so can now init
this->m_status = m_stateHandler->init(&this->m_request);
if (!this->m_status.ok())
{
// We have an error. Proceed to finish and report the status
m_reader.FinishWithError(this->m_status, this);
this->setNextCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
return;
}
// Start reading and push this back onto the command queue.
m_reader.Read(&m_request, this);
}
@ -307,17 +331,12 @@ void RiaGrpcClientStreamCallback<ServiceT, RequestT, ReplyT, StateHandlerT>::onP
{
this->m_reply = ReplyT(); // Make sure it is reset
if (!this->m_status.ok())
{
m_reader.Finish(this->m_reply, this->m_status, this);
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
return;
}
// Call request handler method
this->m_status = m_methodImpl(*this->m_service, &m_context, &this->m_request, &this->m_reply, m_stateHandler.get());
if (!this->m_status.ok())
{
this->setCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
this->setNextCallState(RiaAbstractGrpcCallback::FINISH_REQUEST);
if (this->m_status.error_code() == grpc::OUT_OF_RANGE)
{
m_reader.Finish(this->m_reply, grpc::Status::OK, this);

View File

@ -61,7 +61,7 @@ public:
void run();
void runInThread();
void initialize();
void processRequests();
void processAllQueuedRequests();
void quit();
int currentPortNumber;
@ -169,7 +169,7 @@ void RiaGrpcServerImpl::initialize()
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaGrpcServerImpl::processRequests()
void RiaGrpcServerImpl::processAllQueuedRequests()
{
std::lock_guard<std::mutex> requestLock(m_requestMutex);
@ -182,7 +182,8 @@ void RiaGrpcServerImpl::processRequests()
}
//--------------------------------------------------------------------------------------------------
/// Gracefully shut down the GRPC server. The internal order is important.
/// Gracefully shut down the GRPC server.
/// BE VERY CAREFUL ABOUT CHANGING THE ORDER IN THIS METHOD. IT IS IMPORTANT!
//--------------------------------------------------------------------------------------------------
void RiaGrpcServerImpl::quit()
{
@ -214,7 +215,8 @@ void RiaGrpcServerImpl::quit()
}
//--------------------------------------------------------------------------------------------------
///
/// Block and wait for requests from the client from the command queue.
/// The requests are pushed onto the Unprocessed Request queue which are handled in processRequests
//--------------------------------------------------------------------------------------------------
void RiaGrpcServerImpl::waitForNextRequest()
{
@ -233,26 +235,24 @@ void RiaGrpcServerImpl::waitForNextRequest()
}
//--------------------------------------------------------------------------------------------------
///
/// The handling of calls pushed onto the command queue. We only get one queued callback per client request.
/// The gRPC calls triggered in the callback will see each callback pushed back onto the command queue.
/// The call state will then determine what the callback should do next.
//--------------------------------------------------------------------------------------------------
void RiaGrpcServerImpl::process(RiaAbstractGrpcCallback* method)
{
if (method->callState() == RiaAbstractGrpcCallback::CREATE_HANDLER)
{
RiaLogging::debug(QString("Initialising request handler for: %1").arg(method->name()));
RiaLogging::debug(QString("Creating request handler for: %1").arg(method->name()));
method->createRequestHandler(m_completionQueue.get());
}
else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST_STARTED)
{
// Perform initialization and immediately process the first request
// The initialization is necessary for streaming services.
method->onInitRequestStarted();
}
else if (method->callState() == RiaAbstractGrpcCallback::INIT_REQUEST_COMPLETED)
{
// Perform initialization and immediately process the first request
// The initialization is necessary for streaming services.
RiaLogging::info(QString("Initialising handling: %1").arg(method->name()));
method->onInitRequestCompleted();
}
@ -333,9 +333,9 @@ void RiaGrpcServer::initialize()
//--------------------------------------------------------------------------------------------------
///
//--------------------------------------------------------------------------------------------------
void RiaGrpcServer::processRequests()
void RiaGrpcServer::processAllQueuedRequests()
{
m_serverImpl->processRequests();
m_serverImpl->processAllQueuedRequests();
}
//--------------------------------------------------------------------------------------------------

View File

@ -41,7 +41,7 @@ public:
bool isRunning() const;
void run();
void runInThread();
void processRequests();
void processAllQueuedRequests();
void quit();
static int findAvailablePortNumber(int defaultPortNumber);