Plugins: Add logs for plugin proto client retrieval (#93527)

This commit is contained in:
Will Browne 2024-09-20 17:36:11 +01:00 committed by GitHub
parent e22540b763
commit ccf6fbebfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -15,7 +15,7 @@ import (
) )
var ( var (
errClientNotStarted = errors.New("plugin client has not been started") errClientNotAvailable = errors.New("plugin client is not available")
) )
var _ ProtoClient = (*protoClient)(nil) var _ ProtoClient = (*protoClient)(nil)
@ -36,7 +36,6 @@ type ProtoClient interface {
Logger() log.Logger Logger() log.Logger
Start(context.Context) error Start(context.Context) error
Stop(context.Context) error Stop(context.Context) error
Running(context.Context) bool
} }
type protoClient struct { type protoClient struct {
@ -44,9 +43,20 @@ type protoClient struct {
pluginVersion string pluginVersion string
pluginJSON plugins.JSONData pluginJSON plugins.JSONData
state pluginState
mu sync.RWMutex mu sync.RWMutex
} }
type pluginState int
const (
pluginStateNotStarted pluginState = iota
pluginStateStartSuccess
pluginStateStartFail
pluginStateStopped
)
type ProtoClientOpts struct { type ProtoClientOpts struct {
PluginJSON plugins.JSONData PluginJSON plugins.JSONData
ExecutablePath string ExecutablePath string
@ -68,12 +78,12 @@ func NewProtoClient(opts ProtoClientOpts) (ProtoClient, error) {
func() []string { return opts.Env }, func() []string { return opts.Env },
) )
return &protoClient{plugin: p, pluginVersion: opts.PluginJSON.Info.Version, pluginJSON: opts.PluginJSON}, nil return &protoClient{plugin: p, pluginVersion: opts.PluginJSON.Info.Version, pluginJSON: opts.PluginJSON, state: pluginStateNotStarted}, nil
} }
func (r *protoClient) PID(ctx context.Context) (string, error) { func (r *protoClient) PID(ctx context.Context) (string, error) {
if _, exists := r.client(ctx); !exists { if _, exists := r.client(ctx); !exists {
return "", errClientNotStarted return "", errClientNotAvailable
} }
return r.plugin.client.ID(), nil return r.plugin.client.ID(), nil
} }
@ -101,40 +111,60 @@ func (r *protoClient) Logger() log.Logger {
func (r *protoClient) Start(ctx context.Context) error { func (r *protoClient) Start(ctx context.Context) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
return r.plugin.Start(ctx)
err := r.plugin.Start(ctx)
if err != nil {
r.state = pluginStateStartFail
return err
}
r.state = pluginStateStartSuccess
return nil
} }
func (r *protoClient) Stop(ctx context.Context) error { func (r *protoClient) Stop(ctx context.Context) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.state = pluginStateStopped
return r.plugin.Stop(ctx) return r.plugin.Stop(ctx)
} }
func (r *protoClient) Running(_ context.Context) bool { func (r *protoClient) client(ctx context.Context) (*ClientV2, bool) {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
return !r.plugin.Exited()
}
func (r *protoClient) client(ctx context.Context) (*ClientV2, bool) { logger := r.Logger().FromContext(ctx)
if !r.Running(ctx) { if r.state == pluginStateNotStarted {
logger.Debug("Plugin client has not been started yet")
return nil, false
}
if r.state == pluginStateStartFail {
logger.Debug("Plugin client failed to start")
return nil, false
}
if r.state == pluginStateStopped {
logger.Debug("Plugin client has stopped")
return nil, false
}
if r.plugin.Exited() {
logger.Debug("Plugin client has exited")
return nil, false return nil, false
} }
r.mu.RLock()
if r.plugin.pluginClient == nil { if r.plugin.pluginClient == nil {
r.mu.RUnlock()
return nil, false return nil, false
} }
pc := r.plugin.pluginClient pc := r.plugin.pluginClient
r.mu.RUnlock()
return pc, true return pc, true
} }
func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) { func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.DataClient.QueryData(ctx, in, opts...) return c.DataClient.QueryData(ctx, in, opts...)
} }
@ -142,7 +172,7 @@ func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataReque
func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) { func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.ResourceClient.CallResource(ctx, in, opts...) return c.ResourceClient.CallResource(ctx, in, opts...)
} }
@ -150,7 +180,7 @@ func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourc
func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) { func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.DiagnosticsClient.CheckHealth(ctx, in, opts...) return c.DiagnosticsClient.CheckHealth(ctx, in, opts...)
} }
@ -158,7 +188,7 @@ func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthR
func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) { func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.DiagnosticsClient.CollectMetrics(ctx, in, opts...) return c.DiagnosticsClient.CollectMetrics(ctx, in, opts...)
} }
@ -166,7 +196,7 @@ func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMe
func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) { func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.StreamClient.SubscribeStream(ctx, in, opts...) return c.StreamClient.SubscribeStream(ctx, in, opts...)
} }
@ -174,7 +204,7 @@ func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.Subscrib
func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) { func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.StreamClient.RunStream(ctx, in, opts...) return c.StreamClient.RunStream(ctx, in, opts...)
} }
@ -182,7 +212,7 @@ func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamReque
func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) { func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.StreamClient.PublishStream(ctx, in, opts...) return c.StreamClient.PublishStream(ctx, in, opts...)
} }
@ -190,7 +220,7 @@ func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStr
func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.ValidationResponse, error) { func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.ValidationResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.AdmissionClient.ValidateAdmission(ctx, in, opts...) return c.AdmissionClient.ValidateAdmission(ctx, in, opts...)
} }
@ -198,7 +228,7 @@ func (r *protoClient) ValidateAdmission(ctx context.Context, in *pluginv2.Admiss
func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.MutationResponse, error) { func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.AdmissionRequest, opts ...grpc.CallOption) (*pluginv2.MutationResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.AdmissionClient.MutateAdmission(ctx, in, opts...) return c.AdmissionClient.MutateAdmission(ctx, in, opts...)
} }
@ -206,7 +236,7 @@ func (r *protoClient) MutateAdmission(ctx context.Context, in *pluginv2.Admissio
func (r *protoClient) ConvertObjects(ctx context.Context, in *pluginv2.ConversionRequest, opts ...grpc.CallOption) (*pluginv2.ConversionResponse, error) { func (r *protoClient) ConvertObjects(ctx context.Context, in *pluginv2.ConversionRequest, opts ...grpc.CallOption) (*pluginv2.ConversionResponse, error) {
c, exists := r.client(ctx) c, exists := r.client(ctx)
if !exists { if !exists {
return nil, errClientNotStarted return nil, errClientNotAvailable
} }
return c.ConversionClient.ConvertObjects(ctx, in, opts...) return c.ConversionClient.ConvertObjects(ctx, in, opts...)
} }