///////////////////////////////////////////////////////////////////////////////// // // Copyright (C) 2019- Equinor ASA // // ResInsight is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // ResInsight is distributed in the hope that it will be useful, but WITHOUT ANY // WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. // // See the GNU General Public License at // for more details. // ////////////////////////////////////////////////////////////////////////////////// inline RiaGrpcCallbackInterface::RiaGrpcCallbackInterface() : m_state( CREATE_HANDLER ) , m_status( Status::OK ) { } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- RiaGrpcCallbackInterface::CallState RiaGrpcCallbackInterface::callState() const { return m_state; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- const Status& RiaGrpcCallbackInterface::status() const { return m_status; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- inline void RiaGrpcCallbackInterface::setNextCallState( CallState state ) { m_state = state; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcServiceCallback::RiaGrpcServiceCallback( ServiceT* service ) : m_service( service ) { } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template QString RiaGrpcServiceCallback::name() const { QString fullName = QString( "%1:%2(%3, %4)" ) .arg( typeid( ServiceT ).name() ) .arg( methodType() ) .arg( typeid( RequestT ).name() ) .arg( typeid( ReplyT ).name() ); return fullName; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template const RequestT& RiaGrpcServiceCallback::request() const { return m_request; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template ReplyT& RiaGrpcServiceCallback::reply() { return m_reply; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcUnaryCallback::RiaGrpcUnaryCallback( ServiceT* service, MethodImplT methodImpl, MethodRequestT methodRequest ) : RiaGrpcServiceCallback( service ) , m_responder( &m_context ) , m_methodImpl( methodImpl ) , m_methodRequest( methodRequest ) { } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcCallbackInterface* RiaGrpcUnaryCallback::createNewFromThis() const { return new RiaGrpcUnaryCallback( this->m_service, this->m_methodImpl, this->m_methodRequest ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcUnaryCallback::createRequestHandler( ServerCompletionQueue* completionQueue ) { // The Request-method is where the service gets registered to respond to a given request. m_methodRequest( *this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this ); // Simple unary requests don't need initialisation, so proceed to process as soon as a request turns up. this->setNextCallState( RiaGrpcCallbackInterface::PROCESS_REQUEST ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcUnaryCallback::onProcessRequest() { // Call request handler method this->m_status = m_methodImpl( *this->m_service, &m_context, &this->m_request, &this->m_reply ); // Simply unary requests are finished as soon as you've done any processing. // So next time we receive a new tag on the command queue we should proceed to finish. this->setNextCallState( RiaGrpcCallbackInterface::FINISH_REQUEST ); // Finish will push this callback back on the command queue (now with Finish as the call state). m_responder.Finish( this->m_reply, this->m_status, this ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template QString RiaGrpcUnaryCallback::methodType() const { return "RegularMethod"; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcServerToClientStreamCallback::RiaGrpcServerToClientStreamCallback( ServiceT* service, MethodImplT methodImpl, MethodRequestT methodRequest, StateHandlerT* stateHandler ) : RiaGrpcServiceCallback( service ) , m_responder( &m_context ) , m_methodImpl( methodImpl ) , m_methodRequest( methodRequest ) , m_stateHandler( stateHandler ) { } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcCallbackInterface* RiaGrpcServerToClientStreamCallback::createNewFromThis() const { return new RiaGrpcServerToClientStreamCallback( this->m_service, m_methodImpl, m_methodRequest, new StateHandlerT ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcServerToClientStreamCallback::createRequestHandler( ServerCompletionQueue* completionQueue ) { // The Request-method is where the service gets registered to respond to a given request. m_methodRequest( *this->m_service, &m_context, &this->m_request, &m_responder, completionQueue, completionQueue, this ); // Server->Client Streaming requests require initialisation. However, we receive the complete request immediately. // So can proceed directly to completion of the init request. this->setNextCallState( RiaGrpcCallbackInterface::INIT_REQUEST_COMPLETED ); } //-------------------------------------------------------------------------------------------------- /// Perform initialisation tasks at the time of receiving a complete request //-------------------------------------------------------------------------------------------------- template void RiaGrpcServerToClientStreamCallback::onInitRequestCompleted() { // Initialise streaming state handler this->m_status = m_stateHandler->init( &this->m_request ); if ( !this->m_status.ok() ) { // We have an error. Proceed to finish and report the status this->setNextCallState( RiaGrpcCallbackInterface::FINISH_REQUEST ); m_responder.Finish( this->m_status, this ); return; } // Move on to processing and perform the first processing immediately since the client will // not request anything more. this->setNextCallState( RiaGrpcCallbackInterface::PROCESS_REQUEST ); this->onProcessRequest(); } //-------------------------------------------------------------------------------------------------- /// Process a streaming request and send one package //-------------------------------------------------------------------------------------------------- template void RiaGrpcServerToClientStreamCallback::onProcessRequest() { this->m_reply = ReplyT(); // Make sure it is reset // Call request handler method this->m_status = m_methodImpl( *this->m_service, &m_context, &this->m_request, &this->m_reply, m_stateHandler.get() ); if ( this->m_status.ok() ) { // The write call will send data to client AND put this callback back on the command queue // so that this method gets called again to send the next stream package. m_responder.Write( this->m_reply, this ); } else { this->setNextCallState( RiaGrpcCallbackInterface::FINISH_REQUEST ); // Out of range means we're finished but it isn't an error if ( this->m_status.error_code() == grpc::OUT_OF_RANGE ) { this->m_status = Status::OK; } // Finish will put this callback back on the command queue, now with a finish state. m_responder.Finish( this->m_status, this ); } } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template QString RiaGrpcServerToClientStreamCallback::methodType() const { return "StreamingMethod"; } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcClientToServerStreamCallback::RiaGrpcClientToServerStreamCallback( ServiceT* service, MethodImplT methodImpl, MethodRequestT methodRequest, StateHandlerT* stateHandler ) : RiaGrpcServiceCallback( service ) , m_reader( &m_context ) , m_methodImpl( methodImpl ) , m_methodRequest( methodRequest ) , m_stateHandler( stateHandler ) { } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template RiaGrpcCallbackInterface* RiaGrpcClientToServerStreamCallback::createNewFromThis() const { return new RiaGrpcClientToServerStreamCallback( this->m_service, m_methodImpl, m_methodRequest, new StateHandlerT( true ) ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcClientToServerStreamCallback::createRequestHandler( ServerCompletionQueue* completionQueue ) { // The Request-method is where the service gets registered to respond to a given request. m_methodRequest( *this->m_service, &m_context, &this->m_reader, completionQueue, completionQueue, this ); // The client->server streaming requires initialisation and each request package is streamed asynchronously // So we need to start and complete the init request. this->setNextCallState( RiaGrpcCallbackInterface::INIT_REQUEST_STARTED ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcClientToServerStreamCallback::onInitRequestStarted() { this->setNextCallState( RiaGrpcCallbackInterface::INIT_REQUEST_COMPLETED ); // The read call will start reading the request data and push this callback back onto the command queue // when the read call is completed. m_reader.Read( &this->m_request, this ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcClientToServerStreamCallback::onInitRequestCompleted() { this->setNextCallState( RiaGrpcCallbackInterface::PROCESS_REQUEST ); // Fully received the stream package so can now init this->m_status = m_stateHandler->init( &this->m_request ); if ( !this->m_status.ok() ) { // We have an error. Proceed to finish and report the status m_reader.FinishWithError( this->m_status, this ); this->setNextCallState( RiaGrpcCallbackInterface::FINISH_REQUEST ); return; } // Start reading and push this back onto the command queue. m_reader.Read( &this->m_request, this ); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcClientToServerStreamCallback::onProcessRequest() { this->m_reply = ReplyT(); // Make sure it is reset // Call request handler method this->m_status = m_methodImpl( *this->m_service, &m_context, &this->m_request, &this->m_reply, m_stateHandler.get() ); if ( !this->m_status.ok() ) { this->setNextCallState( RiaGrpcCallbackInterface::FINISH_REQUEST ); m_reader.FinishWithError( this->m_status, this ); } else { CAF_ASSERT( m_stateHandler->streamedValueCount() <= m_stateHandler->totalValueCount() ); if ( m_stateHandler->streamedValueCount() == m_stateHandler->totalValueCount() ) { this->setNextCallState( RiaGrpcCallbackInterface::FINISH_REQUEST ); m_reader.Finish( this->m_reply, grpc::Status::OK, this ); } else { m_reader.Read( &this->m_request, this ); } } } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template void RiaGrpcClientToServerStreamCallback::onFinishRequest() { m_stateHandler->finish(); } //-------------------------------------------------------------------------------------------------- /// //-------------------------------------------------------------------------------------------------- template QString RiaGrpcClientToServerStreamCallback::methodType() const { return "ClientStreamingMethod"; }