From 4299efbc56f0f8bc7251964caa847d1e2b1958b0 Mon Sep 17 00:00:00 2001
From: Will Browne <wbrowne@users.noreply.github.com>
Date: Mon, 13 Nov 2023 14:41:53 +0100
Subject: [PATCH] Plugins: Expose PID through proto interface (#77821)

* expose PID thru interface

* apply PR feedback

* simplify

* add read lock
---
 .../backendplugin/grpcplugin/client_proto.go  | 61 ++++++++++++++-----
 1 file changed, 47 insertions(+), 14 deletions(-)

diff --git a/pkg/plugins/backendplugin/grpcplugin/client_proto.go b/pkg/plugins/backendplugin/grpcplugin/client_proto.go
index cd392b4c693..af18a2b092f 100644
--- a/pkg/plugins/backendplugin/grpcplugin/client_proto.go
+++ b/pkg/plugins/backendplugin/grpcplugin/client_proto.go
@@ -3,6 +3,7 @@ package grpcplugin
 import (
 	"context"
 	"errors"
+	"sync"
 
 	"google.golang.org/grpc"
 
@@ -23,6 +24,7 @@ type ProtoClient interface {
 	pluginv2.DiagnosticsClient
 	pluginv2.StreamClient
 
+	PID() (string, error)
 	PluginID() string
 	Logger() log.Logger
 	Start(context.Context) error
@@ -31,6 +33,8 @@ type ProtoClient interface {
 
 type protoClient struct {
 	plugin *grpcPlugin
+
+	mu sync.RWMutex
 }
 
 type ProtoClientOpts struct {
@@ -57,6 +61,13 @@ func NewProtoClient(opts ProtoClientOpts) (ProtoClient, error) {
 	return &protoClient{plugin: p}, nil
 }
 
+func (r *protoClient) PID() (string, error) {
+	if _, exists := r.client(); !exists {
+		return "", errClientNotStarted
+	}
+	return r.plugin.client.ID(), nil
+}
+
 func (r *protoClient) PluginID() string {
 	return r.plugin.descriptor.pluginID
 }
@@ -66,58 +77,80 @@ func (r *protoClient) Logger() log.Logger {
 }
 
 func (r *protoClient) Start(ctx context.Context) error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
 	return r.plugin.Start(ctx)
 }
 
 func (r *protoClient) Stop(ctx context.Context) error {
+	r.mu.Lock()
+	defer r.mu.Unlock()
 	return r.plugin.Stop(ctx)
 }
 
-func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
+func (r *protoClient) client() (*ClientV2, bool) {
+	r.mu.RLock()
 	if r.plugin.pluginClient == nil {
+		r.mu.RUnlock()
+		return nil, false
+	}
+	pc := r.plugin.pluginClient
+	r.mu.RUnlock()
+	return pc, true
+}
+
+func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.DataClient.QueryData(ctx, in, opts...)
+	return c.DataClient.QueryData(ctx, in, opts...)
 }
 
 func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) {
-	if r.plugin.pluginClient == nil {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.ResourceClient.CallResource(ctx, in, opts...)
+	return c.ResourceClient.CallResource(ctx, in, opts...)
 }
 
 func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) {
-	if r.plugin.pluginClient == nil {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.DiagnosticsClient.CheckHealth(ctx, in, opts...)
+	return c.DiagnosticsClient.CheckHealth(ctx, in, opts...)
 }
 
 func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) {
-	if r.plugin.pluginClient == nil {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.DiagnosticsClient.CollectMetrics(ctx, in, opts...)
+	return c.DiagnosticsClient.CollectMetrics(ctx, in, opts...)
 }
 
 func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) {
-	if r.plugin.pluginClient == nil {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.StreamClient.SubscribeStream(ctx, in, opts...)
+	return c.StreamClient.SubscribeStream(ctx, in, opts...)
 }
 
 func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) {
-	if r.plugin.pluginClient == nil {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.StreamClient.RunStream(ctx, in, opts...)
+	return c.StreamClient.RunStream(ctx, in, opts...)
 }
 
 func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) {
-	if r.plugin.pluginClient == nil {
+	c, exists := r.client()
+	if !exists {
 		return nil, errClientNotStarted
 	}
-	return r.plugin.pluginClient.StreamClient.PublishStream(ctx, in, opts...)
+	return c.StreamClient.PublishStream(ctx, in, opts...)
 }