Plugins: Add API for creating pluginv2 proto client (#77492)

* first pass

* remove client from plugin

* fix wire

* update

* undo import change

* add opts

* add check

* tidy

* re-use logic

* rollback changes
This commit is contained in:
Will Browne 2023-11-03 14:01:08 +01:00 committed by GitHub
parent d624a5d490
commit 19cd7dbae1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 164 additions and 43 deletions

View File

@ -27,6 +27,18 @@ var handshake = goplugin.HandshakeConfig{
MagicCookieValue: grpcplugin.MagicCookieValue,
}
// pluginSet is list of plugins supported on v2.
var pluginSet = map[int]goplugin.PluginSet{
grpcplugin.ProtocolVersion: {
"diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{},
"resource": &grpcplugin.ResourceGRPCPlugin{},
"data": &grpcplugin.DataGRPCPlugin{},
"stream": &grpcplugin.StreamGRPCPlugin{},
"renderer": &pluginextensionv2.RendererGRPCPlugin{},
"secretsmanager": &secretsmanagerplugin.SecretsManagerGRPCPlugin{},
},
}
func newClientConfig(executablePath string, args []string, env []string, logger log.Logger,
versionedPlugins map[int]goplugin.PluginSet) *goplugin.ClientConfig {
// We can ignore gosec G201 here, since the dynamic part of executablePath comes from the plugin definition
@ -70,18 +82,6 @@ type PluginDescriptor struct {
startSecretsManagerFn StartSecretsManagerFunc
}
// getV2PluginSet returns list of plugins supported on v2.
func getV2PluginSet() goplugin.PluginSet {
return goplugin.PluginSet{
"diagnostics": &grpcplugin.DiagnosticsGRPCPlugin{},
"resource": &grpcplugin.ResourceGRPCPlugin{},
"data": &grpcplugin.DataGRPCPlugin{},
"stream": &grpcplugin.StreamGRPCPlugin{},
"renderer": &pluginextensionv2.RendererGRPCPlugin{},
"secretsmanager": &secretsmanagerplugin.SecretsManagerGRPCPlugin{},
}
}
// NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin.
func NewBackendPlugin(pluginID, executablePath string, executableArgs ...string) backendplugin.PluginFactoryFunc {
return newBackendPlugin(pluginID, executablePath, true, executableArgs...)
@ -95,38 +95,32 @@ func NewUnmanagedBackendPlugin(pluginID, executablePath string, executableArgs .
// NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin.
func newBackendPlugin(pluginID, executablePath string, managed bool, executableArgs ...string) backendplugin.PluginFactoryFunc {
return newPlugin(PluginDescriptor{
pluginID: pluginID,
executablePath: executablePath,
executableArgs: executableArgs,
managed: managed,
versionedPlugins: map[int]goplugin.PluginSet{
grpcplugin.ProtocolVersion: getV2PluginSet(),
},
pluginID: pluginID,
executablePath: executablePath,
executableArgs: executableArgs,
managed: managed,
versionedPlugins: pluginSet,
})
}
// NewRendererPlugin creates a new renderer plugin factory used for registering a backend renderer plugin.
func NewRendererPlugin(pluginID, executablePath string, startFn StartRendererFunc) backendplugin.PluginFactoryFunc {
return newPlugin(PluginDescriptor{
pluginID: pluginID,
executablePath: executablePath,
managed: false,
versionedPlugins: map[int]goplugin.PluginSet{
grpcplugin.ProtocolVersion: getV2PluginSet(),
},
startRendererFn: startFn,
pluginID: pluginID,
executablePath: executablePath,
managed: false,
versionedPlugins: pluginSet,
startRendererFn: startFn,
})
}
// NewSecretsManagerPlugin creates a new secrets manager plugin factory used for registering a backend secrets manager plugin.
func NewSecretsManagerPlugin(pluginID, executablePath string, startFn StartSecretsManagerFunc) backendplugin.PluginFactoryFunc {
return newPlugin(PluginDescriptor{
pluginID: pluginID,
executablePath: executablePath,
managed: false,
versionedPlugins: map[int]goplugin.PluginSet{
grpcplugin.ProtocolVersion: getV2PluginSet(),
},
pluginID: pluginID,
executablePath: executablePath,
managed: false,
versionedPlugins: pluginSet,
startSecretsManagerFn: startFn,
})
}

View File

@ -0,0 +1,123 @@
package grpcplugin
import (
"context"
"errors"
"google.golang.org/grpc"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana/pkg/plugins/log"
)
var (
errClientNotStarted = errors.New("plugin client has not been started")
)
var _ ProtoClient = (*protoClient)(nil)
type ProtoClient interface {
pluginv2.DataClient
pluginv2.ResourceClient
pluginv2.DiagnosticsClient
pluginv2.StreamClient
PluginID() string
Logger() log.Logger
Start(context.Context) error
Stop(context.Context) error
}
type protoClient struct {
plugin *grpcPlugin
}
type ProtoClientOpts struct {
PluginID string
ExecutablePath string
ExecutableArgs []string
Env []string
Logger log.Logger
}
func NewProtoClient(opts ProtoClientOpts) (ProtoClient, error) {
p := newGrpcPlugin(
PluginDescriptor{
pluginID: opts.PluginID,
managed: true,
executablePath: opts.ExecutablePath,
executableArgs: opts.ExecutableArgs,
versionedPlugins: pluginSet,
},
opts.Logger,
func() []string { return opts.Env },
)
return &protoClient{plugin: p}, nil
}
func (r *protoClient) PluginID() string {
return r.plugin.descriptor.pluginID
}
func (r *protoClient) Logger() log.Logger {
return r.plugin.logger
}
func (r *protoClient) Start(ctx context.Context) error {
return r.plugin.Start(ctx)
}
func (r *protoClient) Stop(ctx context.Context) error {
return r.plugin.Stop(ctx)
}
func (r *protoClient) QueryData(ctx context.Context, in *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.DataClient.QueryData(ctx, in, opts...)
}
func (r *protoClient) CallResource(ctx context.Context, in *pluginv2.CallResourceRequest, opts ...grpc.CallOption) (pluginv2.Resource_CallResourceClient, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.ResourceClient.CallResource(ctx, in, opts...)
}
func (r *protoClient) CheckHealth(ctx context.Context, in *pluginv2.CheckHealthRequest, opts ...grpc.CallOption) (*pluginv2.CheckHealthResponse, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.DiagnosticsClient.CheckHealth(ctx, in, opts...)
}
func (r *protoClient) CollectMetrics(ctx context.Context, in *pluginv2.CollectMetricsRequest, opts ...grpc.CallOption) (*pluginv2.CollectMetricsResponse, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.DiagnosticsClient.CollectMetrics(ctx, in, opts...)
}
func (r *protoClient) SubscribeStream(ctx context.Context, in *pluginv2.SubscribeStreamRequest, opts ...grpc.CallOption) (*pluginv2.SubscribeStreamResponse, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.StreamClient.SubscribeStream(ctx, in, opts...)
}
func (r *protoClient) RunStream(ctx context.Context, in *pluginv2.RunStreamRequest, opts ...grpc.CallOption) (pluginv2.Stream_RunStreamClient, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.StreamClient.RunStream(ctx, in, opts...)
}
func (r *protoClient) PublishStream(ctx context.Context, in *pluginv2.PublishStreamRequest, opts ...grpc.CallOption) (*pluginv2.PublishStreamResponse, error) {
if r.plugin.pluginClient == nil {
return nil, errClientNotStarted
}
return r.plugin.pluginClient.StreamClient.PublishStream(ctx, in, opts...)
}

View File

@ -28,7 +28,7 @@ type ClientV2 struct {
secretsmanagerplugin.SecretsManagerPlugin
}
func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (pluginClient, error) {
func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugin.ClientProtocol) (*ClientV2, error) {
rawDiagnostics, err := rpcClient.Dispense("diagnostics")
if err != nil {
return nil, err
@ -59,7 +59,7 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi
return nil, err
}
c := ClientV2{}
c := &ClientV2{}
if rawDiagnostics != nil {
if diagnosticsClient, ok := rawDiagnostics.(grpcplugin.DiagnosticsClient); ok {
c.DiagnosticsClient = diagnosticsClient
@ -108,7 +108,7 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi
}
}
return &c, nil
return c, nil
}
func (c *ClientV2) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {

View File

@ -26,7 +26,7 @@ type grpcPlugin struct {
descriptor PluginDescriptor
clientFactory func() *plugin.Client
client *plugin.Client
pluginClient pluginClient
pluginClient *ClientV2
logger log.Logger
mutex sync.RWMutex
decommissioned bool
@ -35,13 +35,17 @@ type grpcPlugin struct {
// newPlugin allocates and returns a new gRPC (external) backendplugin.Plugin.
func newPlugin(descriptor PluginDescriptor) backendplugin.PluginFactoryFunc {
return func(pluginID string, logger log.Logger, env func() []string) (backendplugin.Plugin, error) {
return &grpcPlugin{
descriptor: descriptor,
logger: logger,
clientFactory: func() *plugin.Client {
return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), logger, descriptor.versionedPlugins))
},
}, nil
return newGrpcPlugin(descriptor, logger, env), nil
}
}
func newGrpcPlugin(descriptor PluginDescriptor, logger log.Logger, env func() []string) *grpcPlugin {
return &grpcPlugin{
descriptor: descriptor,
logger: logger,
clientFactory: func() *plugin.Client {
return plugin.NewClient(newClientConfig(descriptor.executablePath, descriptor.executableArgs, env(), logger, descriptor.versionedPlugins))
},
}
}