diff --git a/pkg/plugins/backendplugin/grpcplugin/client_proto.go b/pkg/plugins/backendplugin/grpcplugin/client_proto.go index e73b64d3e3b..19c83d0c7dd 100644 --- a/pkg/plugins/backendplugin/grpcplugin/client_proto.go +++ b/pkg/plugins/backendplugin/grpcplugin/client_proto.go @@ -3,7 +3,6 @@ package grpcplugin import ( "context" "errors" - "sync" "google.golang.org/grpc" @@ -15,7 +14,7 @@ import ( ) var ( - errClientNotStarted = errors.New("plugin client has not been started") + errClientNotAvailable = errors.New("plugin client not available") ) var _ ProtoClient = (*protoClient)(nil) @@ -31,20 +30,12 @@ type ProtoClient interface { PID(context.Context) (string, error) PluginID() string PluginVersion() string - PluginJSON() plugins.JSONData Backend() backendplugin.Plugin - Logger() log.Logger - Start(context.Context) error - Stop(context.Context) error - Running(context.Context) bool } type protoClient struct { - plugin *grpcPlugin - pluginVersion string - pluginJSON plugins.JSONData - - mu sync.RWMutex + plugin *grpcPlugin + pluginJSON plugins.JSONData } type ProtoClientOpts struct { @@ -68,12 +59,12 @@ func NewProtoClient(opts ProtoClientOpts) (ProtoClient, error) { func() []string { return opts.Env }, ) - return &protoClient{plugin: p, pluginVersion: opts.PluginJSON.Info.Version, pluginJSON: opts.PluginJSON}, nil + return &protoClient{plugin: p, pluginJSON: opts.PluginJSON}, nil } func (r *protoClient) PID(ctx context.Context) (string, error) { if _, exists := r.client(ctx); !exists { - return "", errClientNotStarted + return "", errClientNotAvailable } return r.plugin.client.ID(), nil } @@ -83,11 +74,7 @@ func (r *protoClient) PluginID() string { } func (r *protoClient) PluginVersion() string { - return r.pluginVersion -} - -func (r *protoClient) PluginJSON() plugins.JSONData { - return r.pluginJSON + return r.pluginJSON.Info.Version } func (r *protoClient) Backend() backendplugin.Plugin { @@ -98,43 +85,14 @@ func (r *protoClient) Logger() log.Logger { return r.plugin.logger } -func (r *protoClient) Start(ctx context.Context) error { - r.mu.Lock() - defer r.mu.Unlock() - return r.plugin.Start(ctx) -} - -func (r *protoClient) Stop(ctx context.Context) error { - r.mu.Lock() - defer r.mu.Unlock() - return r.plugin.Stop(ctx) -} - -func (r *protoClient) Running(_ context.Context) bool { - r.mu.RLock() - defer r.mu.RUnlock() - return !r.plugin.Exited() -} - func (r *protoClient) client(ctx context.Context) (*ClientV2, bool) { - if !r.Running(ctx) { - return nil, false - } - - r.mu.RLock() - if r.plugin.pluginClient == nil { - r.mu.RUnlock() - return nil, false - } - pc := r.plugin.pluginClient - r.mu.RUnlock() - return pc, true + return r.plugin.getPluginClient(ctx) } func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.DataClient.QueryData(ctx, in, opts...) } @@ -142,7 +100,7 @@ func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataReque func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.ResourceClient.CallResource(ctx, in, opts...) } @@ -150,7 +108,7 @@ func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourc func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.DiagnosticsClient.CheckHealth(ctx, in, opts...) } @@ -158,7 +116,7 @@ func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthR func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.DiagnosticsClient.CollectMetrics(ctx, in, opts...) } @@ -166,7 +124,7 @@ func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMe func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.StreamClient.SubscribeStream(ctx, in, opts...) } @@ -174,7 +132,7 @@ func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.Subscrib func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.StreamClient.RunStream(ctx, in, opts...) } @@ -182,7 +140,7 @@ func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamReque func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.StreamClient.PublishStream(ctx, in, opts...) } @@ -190,7 +148,7 @@ func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStr func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.ValidationResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.AdmissionClient.ValidateAdmission(ctx, in, opts...) } @@ -198,7 +156,7 @@ func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.Admiss func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.MutationResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.AdmissionClient.MutateAdmission(ctx, in, opts...) } @@ -206,7 +164,7 @@ func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.Admissio func (r *protoClient) ConvertObjects(ctx context.Context, in *pluginv2.ConversionRequest, opts ...grpc.CallOption) (*pluginv2.ConversionResponse, error) { c, exists := r.client(ctx) if !exists { - return nil, errClientNotStarted + return nil, errClientNotAvailable } return c.ConversionClient.ConvertObjects(ctx, in, opts...) } diff --git a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go index d7b632e95be..bce188928aa 100644 --- a/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go +++ b/pkg/plugins/backendplugin/grpcplugin/grpc_plugin.go @@ -14,16 +14,6 @@ import ( "github.com/grafana/grafana/pkg/plugins/log" ) -type pluginClient interface { - backend.CollectMetricsHandler - backend.CheckHealthHandler - backend.QueryDataHandler - backend.CallResourceHandler - backend.AdmissionHandler - backend.ConversionHandler - backend.StreamHandler -} - type grpcPlugin struct { descriptor PluginDescriptor clientFactory func() *plugin.Client @@ -32,8 +22,19 @@ type grpcPlugin struct { logger log.Logger mutex sync.RWMutex decommissioned bool + state pluginState } +type pluginState int + +const ( + pluginStateNotStarted pluginState = iota + pluginStateStartInit + pluginStateStartSuccess + pluginStateStartFail + pluginStateStopped +) + // newPlugin allocates and returns a new gRPC (external) backendplugin.Plugin. func newPlugin(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc { return func(pluginID string, logger log.Logger, env func() []string) (backendplugin.Plugin, error) { @@ -48,6 +49,7 @@ func newGrpcPlugin(descriptor PluginDescriptor, logger log.Logger, env func() [] clientFactory: func() *plugin.Client { return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), descriptor.skipHostEnvVars, logger, descriptor.versionedPlugins)) }, + state: pluginStateNotStarted, } } @@ -63,21 +65,27 @@ func (p *grpcPlugin) Start(_ context.Context) error { p.mutex.Lock() defer p.mutex.Unlock() + p.state = pluginStateStartInit + p.client = p.clientFactory() rpcClient, err := p.client.Client() if err != nil { + p.state = pluginStateStartFail return err } if p.client.NegotiatedVersion() < 2 { + p.state = pluginStateStartFail return errors.New("plugin protocol version not supported") } p.pluginClient, err = newClientV2(p.descriptor, p.logger, rpcClient) if err != nil { + p.state = pluginStateStartFail return err } if p.pluginClient == nil { + p.state = pluginStateStartFail return errors.New("no compatible plugin implementation found") } @@ -89,6 +97,7 @@ func (p *grpcPlugin) Start(_ context.Context) error { p.logger.Warn("Plugin process is running with elevated privileges. This is not recommended") } + p.state = pluginStateStartSuccess return nil } @@ -99,6 +108,7 @@ func (p *grpcPlugin) Stop(_ context.Context) error { if p.client != nil { p.client.Kill() } + p.state = pluginStateStopped return nil } @@ -134,94 +144,110 @@ func (p *grpcPlugin) Target() backendplugin.Target { return backendplugin.TargetLocal } -func (p *grpcPlugin) getPluginClient() (pluginClient, bool) { +func (p *grpcPlugin) getPluginClient(ctx context.Context) (*ClientV2, bool) { p.mutex.RLock() - if p.client == nil || p.client.Exited() || p.pluginClient == nil { - p.mutex.RUnlock() - return nil, false + defer p.mutex.RUnlock() + if p.client != nil && !p.client.Exited() && p.pluginClient != nil { + return p.pluginClient, true } - pluginClient := p.pluginClient - p.mutex.RUnlock() - return pluginClient, true + + logger := p.Logger().FromContext(ctx) + if p.state == pluginStateNotStarted { + logger.Debug("Plugin client has not been started yet") + } + + if p.state == pluginStateStartInit { + logger.Debug("Plugin client is starting") + } + + if p.state == pluginStateStartFail { + logger.Debug("Plugin client failed to start") + } + + if p.state == pluginStateStopped { + logger.Debug("Plugin client has stopped") + } + + return nil, false } func (p *grpcPlugin) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.CollectMetrics(ctx, req) + return pc.CollectMetrics(ctx, req) } func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.CheckHealth(ctx, req) + return pc.CheckHealth(ctx, req) } func (p *grpcPlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.QueryData(ctx, req) + return pc.QueryData(ctx, req) } func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return plugins.ErrPluginUnavailable } - return pluginClient.CallResource(ctx, req, sender) + return pc.CallResource(ctx, req, sender) } func (p *grpcPlugin) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.SubscribeStream(ctx, request) + return pc.SubscribeStream(ctx, request) } func (p *grpcPlugin) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.PublishStream(ctx, request) + return pc.PublishStream(ctx, request) } func (p *grpcPlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return plugins.ErrPluginUnavailable } - return pluginClient.RunStream(ctx, req, sender) + return pc.RunStream(ctx, req, sender) } func (p *grpcPlugin) ValidateAdmission(ctx context.Context, request *backend.AdmissionRequest) (*backend.ValidationResponse, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.ValidateAdmission(ctx, request) + return pc.ValidateAdmission(ctx, request) } func (p *grpcPlugin) MutateAdmission(ctx context.Context, request *backend.AdmissionRequest) (*backend.MutationResponse, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.MutateAdmission(ctx, request) + return pc.MutateAdmission(ctx, request) } func (p *grpcPlugin) ConvertObjects(ctx context.Context, request *backend.ConversionRequest) (*backend.ConversionResponse, error) { - pluginClient, ok := p.getPluginClient() + pc, ok := p.getPluginClient(ctx) if !ok { return nil, plugins.ErrPluginUnavailable } - return pluginClient.ConvertObjects(ctx, request) + return pc.ConvertObjects(ctx, request) }