grafana/pkg/plugins/backendplugin/grpcplugin/client_v2.go
Will Browne ec82719372
Plugins: Introduce plugin package specific logger (#62204)
* refactor

* implement with infra log for now

* undo moving

* update package name

* update name

* fix tests

* update pretty signature

* update naming

* simplify

* fix typo

* delete comment

* fix import

* retrigger
2023-02-28 16:10:27 +01:00

260 lines
7.1 KiB
Go

package grpcplugin
import (
"context"
"errors"
"fmt"
"io"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/grpcplugin"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/hashicorp/go-plugin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2"
"github.com/grafana/grafana/pkg/plugins/backendplugin/secretsmanagerplugin"
"github.com/grafana/grafana/pkg/plugins/log"
)
type ClientV2 struct {
grpcplugin.DiagnosticsClient
grpcplugin.ResourceClient
grpcplugin.DataClient
grpcplugin.StreamClient
pluginextensionv2.RendererPlugin
secretsmanagerplugin.SecretsManagerPlugin
}
func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) {
rawDiagnostics, err := rpcClient.Dispense("diagnostics")
if err != nil {
return nil, err
}
rawResource, err := rpcClient.Dispense("resource")
if err != nil {
return nil, err
}
rawData, err := rpcClient.Dispense("data")
if err != nil {
return nil, err
}
rawStream, err := rpcClient.Dispense("stream")
if err != nil {
return nil, err
}
rawRenderer, err := rpcClient.Dispense("renderer")
if err != nil {
return nil, err
}
rawSecretsManager, err := rpcClient.Dispense("secretsmanager")
if err != nil {
return nil, err
}
c := ClientV2{}
if rawDiagnostics != nil {
if diagnosticsClient, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok {
c.DiagnosticsClient = diagnosticsClient
}
}
if rawResource != nil {
if resourceClient, ok := rawResource.(grpcplugin.ResourceClient); ok {
c.ResourceClient = resourceClient
}
}
if rawData != nil {
if dataClient, ok := rawData.(grpcplugin.DataClient); ok {
c.DataClient = dataClient
}
}
if rawStream != nil {
if streamClient, ok := rawStream.(grpcplugin.StreamClient); ok {
c.StreamClient = streamClient
}
}
if rawRenderer != nil {
if rendererPlugin, ok := rawRenderer.(pluginextensionv2.RendererPlugin); ok {
c.RendererPlugin = rendererPlugin
}
}
if rawSecretsManager != nil {
if secretsManagerPlugin, ok := rawSecretsManager.(secretsmanagerplugin.SecretsManagerPlugin); ok {
c.SecretsManagerPlugin = secretsManagerPlugin
}
}
if descriptor.startRendererFn != nil {
if err := descriptor.startRendererFn(descriptor.pluginID, c.RendererPlugin, logger); err != nil {
return nil, err
}
}
if descriptor.startSecretsManagerFn != nil {
if err := descriptor.startSecretsManagerFn(descriptor.pluginID, c.SecretsManagerPlugin, logger); err != nil {
return nil, err
}
}
return &c, nil
}
func (c *ClientV2) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
if c.DiagnosticsClient == nil {
return &backend.CollectMetricsResult{}, nil
}
protoResp, err := c.DiagnosticsClient.CollectMetrics(ctx, backend.ToProto().CollectMetricsRequest(req))
if err != nil {
if status.Code(err) == codes.Unimplemented {
return &backend.CollectMetricsResult{}, nil
}
return nil, err
}
return backend.FromProto().CollectMetricsResponse(protoResp), nil
}
func (c *ClientV2) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
if c.DiagnosticsClient == nil {
return nil, backendplugin.ErrMethodNotImplemented
}
protoContext := backend.ToProto().PluginContext(req.PluginContext)
protoResp, err := c.DiagnosticsClient.CheckHealth(ctx, &pluginv2.CheckHealthRequest{PluginContext: protoContext, Headers: req.Headers})
if err != nil {
if status.Code(err) == codes.Unimplemented {
return &backend.CheckHealthResult{
Status: backend.HealthStatusUnknown,
Message: "Health check not implemented",
}, nil
}
return nil, err
}
return backend.FromProto().CheckHealthResponse(protoResp), nil
}
func (c *ClientV2) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if c.DataClient == nil {
return nil, backendplugin.ErrMethodNotImplemented
}
protoReq := backend.ToProto().QueryDataRequest(req)
protoResp, err := c.DataClient.QueryData(ctx, protoReq)
if err != nil {
if status.Code(err) == codes.Unimplemented {
return nil, backendplugin.ErrMethodNotImplemented
}
return nil, fmt.Errorf("%v: %w", "Failed to query data", err)
}
return backend.FromProto().QueryDataResponse(protoResp)
}
func (c *ClientV2) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
if c.ResourceClient == nil {
return backendplugin.ErrMethodNotImplemented
}
protoReq := backend.ToProto().CallResourceRequest(req)
protoStream, err := c.ResourceClient.CallResource(ctx, protoReq)
if err != nil {
if status.Code(err) == codes.Unimplemented {
return backendplugin.ErrMethodNotImplemented
}
return fmt.Errorf("%v: %w", "Failed to call resource", err)
}
for {
protoResp, err := protoStream.Recv()
if err != nil {
if status.Code(err) == codes.Unimplemented {
return backendplugin.ErrMethodNotImplemented
}
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("%v: %w", "failed to receive call resource response", err)
}
if err := sender.Send(backend.FromProto().CallResourceResponse(protoResp)); err != nil {
return err
}
}
}
func (c *ClientV2) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
if c.StreamClient == nil {
return nil, backendplugin.ErrMethodNotImplemented
}
protoResp, err := c.StreamClient.SubscribeStream(ctx, backend.ToProto().SubscribeStreamRequest(req))
if err != nil {
return nil, err
}
return backend.FromProto().SubscribeStreamResponse(protoResp), nil
}
func (c *ClientV2) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
if c.StreamClient == nil {
return nil, backendplugin.ErrMethodNotImplemented
}
protoResp, err := c.StreamClient.PublishStream(ctx, backend.ToProto().PublishStreamRequest(req))
if err != nil {
return nil, err
}
return backend.FromProto().PublishStreamResponse(protoResp), nil
}
func (c *ClientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
if c.StreamClient == nil {
return backendplugin.ErrMethodNotImplemented
}
protoReq := backend.ToProto().RunStreamRequest(req)
protoStream, err := c.StreamClient.RunStream(ctx, protoReq)
if err != nil {
if status.Code(err) == codes.Unimplemented {
return backendplugin.ErrMethodNotImplemented
}
return fmt.Errorf("%v: %w", "Failed to call resource", err)
}
for {
p, err := protoStream.Recv()
if err != nil {
if status.Code(err) == codes.Unimplemented {
return backendplugin.ErrMethodNotImplemented
}
if errors.Is(err, io.EOF) {
return nil
}
return fmt.Errorf("error running stream: %w", err)
}
// From GRPC connection we receive already prepared JSON.
err = sender.SendJSON(p.Data)
if err != nil {
return err
}
}
}