Plugins: Add state logs for plugin client retrieval (#93630)

* add state to grpc plugin

* tidy

* fix lint

* fix issues

* return true

* use defer

* update err message
This commit is contained in:
Will Browne 2024-09-24 13:55:02 +01:00 committed by GitHub
parent a54308138f
commit 52f8cecd4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 80 additions and 96 deletions

View File

@ -3,7 +3,6 @@ package grpcplugin
import ( import (
"context" "context"
"errors" "errors"
"sync"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -15,7 +14,7 @@ import (
) )
var ( var (
errClientNotStarted = errors.New("plugin client has not been started") errClientNotAvailable = errors.New("plugin client not available")
) )
var _ ProtoClient = (*protoClient)(nil) var _ ProtoClient = (*protoClient)(nil)
@ -31,20 +30,12 @@ type ProtoClient interface {
PID(context.Context) (string, error) PID(context.Context) (string, error)
PluginID() string PluginID() string
PluginVersion() string PluginVersion() string
PluginJSON() plugins.JSONData
Backend() backendplugin.Plugin Backend() backendplugin.Plugin
Logger() log.Logger
Start(context.Context) error
Stop(context.Context) error
Running(context.Context) bool
} }
type protoClient struct { type protoClient struct {
plugin *grpcPlugin plugin *grpcPlugin
pluginVersion string pluginJSON plugins.JSONData
pluginJSON plugins.JSONData
mu sync.RWMutex
} }
type ProtoClientOpts struct { type ProtoClientOpts struct {
@ -68,12 +59,12 @@ func NewProtoClient(opts ProtoClientOpts) (ProtoClient, error) {
func() []string { return opts.Env }, func() []string { return opts.Env },
) )
return &protoClient{plugin: p, pluginVersion: opts.PluginJSON.Info.Version, pluginJSON: opts.PluginJSON}, nil return &protoClient{plugin: p, pluginJSON: opts.PluginJSON}, nil
} }
func (r *protoClient) PID(ctx context.Context) (string, error) { func (r *protoClient) PID(ctx context.Context) (string, error) {
if _, exists := r.client(ctx); !exists { if _, exists := r.client(ctx); !exists {
return "", errClientNotStarted return "", errClientNotAvailable
} }
return r.plugin.client.ID(), nil return r.plugin.client.ID(), nil
} }
@ -83,11 +74,7 @@ func (r *protoClient) PluginID() string {
} }
func (r *protoClient) PluginVersion() string { func (r *protoClient) PluginVersion() string {
return r.pluginVersion return r.pluginJSON.Info.Version
}
func (r *protoClient) PluginJSON() plugins.JSONData {
return r.pluginJSON
} }
func (r *protoClient) Backend() backendplugin.Plugin { func (r *protoClient) Backend() backendplugin.Plugin {
@ -98,43 +85,14 @@ func (r *protoClient) Logger() log.Logger {
return r.plugin.logger return r.plugin.logger
} }
func (r *protoClient) Start(ctx context.Context) error {
r.mu.Lock()
defer r.mu.Unlock()
return r.plugin.Start(ctx)
}
func (r *protoClient) Stop(ctx context.Context) error {
r.mu.Lock()
defer r.mu.Unlock()
return r.plugin.Stop(ctx)
}
func (r *protoClient) Running(_ context.Context) bool {
r.mu.RLock()
defer r.mu.RUnlock()
return !r.plugin.Exited()
}
func (r *protoClient) client(ctx context.Context) (*ClientV2, bool) { func (r *protoClient) client(ctx context.Context) (*ClientV2, bool) {
if !r.Running(ctx) { return r.plugin.getPluginClient(ctx)
return nil, false
}
r.mu.RLock()
if r.plugin.pluginClient == nil {
r.mu.RUnlock()
return nil, false
}
pc := r.plugin.pluginClient
r.mu.RUnlock()
return pc, true
} }
func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.DataClient.QueryData(ctx, in, opts...) return c.DataClient.QueryData(ctx, in, opts...)
} }
@ -142,7 +100,7 @@ func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataReque
func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) { func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.ResourceClient.CallResource(ctx, in, opts...) return c.ResourceClient.CallResource(ctx, in, opts...)
} }
@ -150,7 +108,7 @@ func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourc
func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) { func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.DiagnosticsClient.CheckHealth(ctx, in, opts...) return c.DiagnosticsClient.CheckHealth(ctx, in, opts...)
} }
@ -158,7 +116,7 @@ func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthR
func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) { func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.DiagnosticsClient.CollectMetrics(ctx, in, opts...) return c.DiagnosticsClient.CollectMetrics(ctx, in, opts...)
} }
@ -166,7 +124,7 @@ func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMe
func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) { func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.StreamClient.SubscribeStream(ctx, in, opts...) return c.StreamClient.SubscribeStream(ctx, in, opts...)
} }
@ -174,7 +132,7 @@ func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.Subscrib
func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) { func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.StreamClient.RunStream(ctx, in, opts...) return c.StreamClient.RunStream(ctx, in, opts...)
} }
@ -182,7 +140,7 @@ func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamReque
func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) { func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.StreamClient.PublishStream(ctx, in, opts...) return c.StreamClient.PublishStream(ctx, in, opts...)
} }
@ -190,7 +148,7 @@ func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStr
func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.ValidationResponse, error) { func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.ValidationResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.AdmissionClient.ValidateAdmission(ctx, in, opts...) return c.AdmissionClient.ValidateAdmission(ctx, in, opts...)
} }
@ -198,7 +156,7 @@ func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.Admiss
func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.MutationResponse, error) { func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.MutationResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.AdmissionClient.MutateAdmission(ctx, in, opts...) return c.AdmissionClient.MutateAdmission(ctx, in, opts...)
} }
@ -206,7 +164,7 @@ func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.Admissio
func (r *protoClient) ConvertObjects(ctx context.Context, in *pluginv2.ConversionRequest, opts ...grpc.CallOption) (*pluginv2.ConversionResponse, error) { func (r *protoClient) ConvertObjects(ctx context.Context, in *pluginv2.ConversionRequest, opts ...grpc.CallOption) (*pluginv2.ConversionResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.ConversionClient.ConvertObjects(ctx, in, opts...) return c.ConversionClient.ConvertObjects(ctx, in, opts...)
} }

View File

@ -14,16 +14,6 @@ import (
"github.com/grafana/grafana/pkg/plugins/log" "github.com/grafana/grafana/pkg/plugins/log"
) )
type pluginClient interface {
backend.CollectMetricsHandler
backend.CheckHealthHandler
backend.QueryDataHandler
backend.CallResourceHandler
backend.AdmissionHandler
backend.ConversionHandler
backend.StreamHandler
}
type grpcPlugin struct { type grpcPlugin struct {
descriptor PluginDescriptor descriptor PluginDescriptor
clientFactory func() *plugin.Client clientFactory func() *plugin.Client
@ -32,8 +22,19 @@ type grpcPlugin struct {
logger log.Logger logger log.Logger
mutex sync.RWMutex mutex sync.RWMutex
decommissioned bool decommissioned bool
state pluginState
} }
type pluginState int
const (
pluginStateNotStarted pluginState = iota
pluginStateStartInit
pluginStateStartSuccess
pluginStateStartFail
pluginStateStopped
)
// newPlugin allocates and returns a new gRPC (external) backendplugin.Plugin. // newPlugin allocates and returns a new gRPC (external) backendplugin.Plugin.
func newPlugin(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc { func newPlugin(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc {
return func(pluginID string, logger log.Logger, env func() []string) (backendplugin.Plugin, error) { return func(pluginID string, logger log.Logger, env func() []string) (backendplugin.Plugin, error) {
@ -48,6 +49,7 @@ func newGrpcPlugin(descriptor PluginDescriptor, logger log.Logger, env func() []
clientFactory: func() *plugin.Client { clientFactory: func() *plugin.Client {
return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), descriptor.skipHostEnvVars, logger, descriptor.versionedPlugins)) return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), descriptor.skipHostEnvVars, logger, descriptor.versionedPlugins))
}, },
state: pluginStateNotStarted,
} }
} }
@ -63,21 +65,27 @@ func (p *grpcPlugin) Start(_ context.Context) error {
p.mutex.Lock() p.mutex.Lock()
defer p.mutex.Unlock() defer p.mutex.Unlock()
p.state = pluginStateStartInit
p.client = p.clientFactory() p.client = p.clientFactory()
rpcClient, err := p.client.Client() rpcClient, err := p.client.Client()
if err != nil { if err != nil {
p.state = pluginStateStartFail
return err return err
} }
if p.client.NegotiatedVersion() < 2 { if p.client.NegotiatedVersion() < 2 {
p.state = pluginStateStartFail
return errors.New("plugin protocol version not supported") return errors.New("plugin protocol version not supported")
} }
p.pluginClient, err = newClientV2(p.descriptor, p.logger, rpcClient) p.pluginClient, err = newClientV2(p.descriptor, p.logger, rpcClient)
if err != nil { if err != nil {
p.state = pluginStateStartFail
return err return err
} }
if p.pluginClient == nil { if p.pluginClient == nil {
p.state = pluginStateStartFail
return errors.New("no compatible plugin implementation found") return errors.New("no compatible plugin implementation found")
} }
@ -89,6 +97,7 @@ func (p *grpcPlugin) Start(_ context.Context) error {
p.logger.Warn("Plugin process is running with elevated privileges. This is not recommended") p.logger.Warn("Plugin process is running with elevated privileges. This is not recommended")
} }
p.state = pluginStateStartSuccess
return nil return nil
} }
@ -99,6 +108,7 @@ func (p *grpcPlugin) Stop(_ context.Context) error {
if p.client != nil { if p.client != nil {
p.client.Kill() p.client.Kill()
} }
p.state = pluginStateStopped
return nil return nil
} }
@ -134,94 +144,110 @@ func (p *grpcPlugin) Target() backendplugin.Target {
return backendplugin.TargetLocal return backendplugin.TargetLocal
} }
func (p *grpcPlugin) getPluginClient() (pluginClient, bool) { func (p *grpcPlugin) getPluginClient(ctx context.Context) (*ClientV2, bool) {
p.mutex.RLock() p.mutex.RLock()
if p.client == nil || p.client.Exited() || p.pluginClient == nil { defer p.mutex.RUnlock()
p.mutex.RUnlock() if p.client != nil && !p.client.Exited() && p.pluginClient != nil {
return nil, false return p.pluginClient, true
} }
pluginClient := p.pluginClient
p.mutex.RUnlock() logger := p.Logger().FromContext(ctx)
return pluginClient, true if p.state == pluginStateNotStarted {
logger.Debug("Plugin client has not been started yet")
}
if p.state == pluginStateStartInit {
logger.Debug("Plugin client is starting")
}
if p.state == pluginStateStartFail {
logger.Debug("Plugin client failed to start")
}
if p.state == pluginStateStopped {
logger.Debug("Plugin client has stopped")
}
return nil, false
} }
func (p *grpcPlugin) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { func (p *grpcPlugin) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.CollectMetrics(ctx, req) return pc.CollectMetrics(ctx, req)
} }
func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.CheckHealth(ctx, req) return pc.CheckHealth(ctx, req)
} }
func (p *grpcPlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { func (p *grpcPlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.QueryData(ctx, req) return pc.QueryData(ctx, req)
} }
func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return plugins.ErrPluginUnavailable return plugins.ErrPluginUnavailable
} }
return pluginClient.CallResource(ctx, req, sender) return pc.CallResource(ctx, req, sender)
} }
func (p *grpcPlugin) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { func (p *grpcPlugin) SubscribeStream(ctx context.Context, request *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.SubscribeStream(ctx, request) return pc.SubscribeStream(ctx, request)
} }
func (p *grpcPlugin) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { func (p *grpcPlugin) PublishStream(ctx context.Context, request *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.PublishStream(ctx, request) return pc.PublishStream(ctx, request)
} }
func (p *grpcPlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { func (p *grpcPlugin) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return plugins.ErrPluginUnavailable return plugins.ErrPluginUnavailable
} }
return pluginClient.RunStream(ctx, req, sender) return pc.RunStream(ctx, req, sender)
} }
func (p *grpcPlugin) ValidateAdmission(ctx context.Context, request *backend.AdmissionRequest) (*backend.ValidationResponse, error) { func (p *grpcPlugin) ValidateAdmission(ctx context.Context, request *backend.AdmissionRequest) (*backend.ValidationResponse, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.ValidateAdmission(ctx, request) return pc.ValidateAdmission(ctx, request)
} }
func (p *grpcPlugin) MutateAdmission(ctx context.Context, request *backend.AdmissionRequest) (*backend.MutationResponse, error) { func (p *grpcPlugin) MutateAdmission(ctx context.Context, request *backend.AdmissionRequest) (*backend.MutationResponse, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.MutateAdmission(ctx, request) return pc.MutateAdmission(ctx, request)
} }
func (p *grpcPlugin) ConvertObjects(ctx context.Context, request *backend.ConversionRequest) (*backend.ConversionResponse, error) { func (p *grpcPlugin) ConvertObjects(ctx context.Context, request *backend.ConversionRequest) (*backend.ConversionResponse, error) {
pluginClient, ok := p.getPluginClient() pc, ok := p.getPluginClient(ctx)
if !ok { if !ok {
return nil, plugins.ErrPluginUnavailable return nil, plugins.ErrPluginUnavailable
} }
return pluginClient.ConvertObjects(ctx, request) return pc.ConvertObjects(ctx, request)
} }