mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Chore: Refactor backend plugin manager/tsdb query data (#34944)
Move QueryData method into backend plugin manager which HandleRequest uses to query data from plugin SDK supported data sources. This allowed us to remove a lot of code no longer needed. Ref #21510 Co-authored-by: Will Browne <wbrowne@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
56e0efbb56
commit
b3e9087557
@@ -5,10 +5,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
|
||||
)
|
||||
|
||||
// corePlugin represents a plugin that's part of Grafana core.
|
||||
@@ -43,15 +40,6 @@ func (cp *corePlugin) Logger() log.Logger {
|
||||
return cp.logger
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataResponse deprecated
|
||||
func (cp *corePlugin) DataQuery(ctx context.Context, dsInfo *models.DataSource,
|
||||
tsdbQuery plugins.DataQuery) (plugins.DataResponse, error) {
|
||||
// TODO: Inline the adapter, since it shouldn't be necessary
|
||||
adapter := newQueryEndpointAdapter(cp.pluginID, cp.logger, instrumentation.InstrumentQueryDataHandler(
|
||||
cp.QueryDataHandler))
|
||||
return adapter.DataQuery(ctx, dsInfo, tsdbQuery)
|
||||
}
|
||||
|
||||
func (cp *corePlugin) Start(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
@@ -88,6 +76,14 @@ func (cp *corePlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthR
|
||||
return nil, backendplugin.ErrMethodNotImplemented
|
||||
}
|
||||
|
||||
func (cp *corePlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
if cp.QueryDataHandler != nil {
|
||||
return cp.QueryDataHandler.QueryData(ctx, req)
|
||||
}
|
||||
|
||||
return nil, backendplugin.ErrMethodNotImplemented
|
||||
}
|
||||
|
||||
func (cp *corePlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
if cp.CallResourceHandler != nil {
|
||||
return cp.CallResourceHandler.CallResource(ctx, req, sender)
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
package coreplugin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/adapters"
|
||||
)
|
||||
|
||||
// nolint:staticcheck // plugins.DataPlugin deprecated
|
||||
func newQueryEndpointAdapter(pluginID string, logger log.Logger, handler backend.QueryDataHandler) plugins.DataPlugin {
|
||||
return &queryEndpointAdapter{
|
||||
pluginID: pluginID,
|
||||
logger: logger,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
type queryEndpointAdapter struct {
|
||||
pluginID string
|
||||
logger log.Logger
|
||||
handler backend.QueryDataHandler
|
||||
}
|
||||
|
||||
func modelToInstanceSettings(ds *models.DataSource) (*backend.DataSourceInstanceSettings, error) {
|
||||
jsonDataBytes, err := ds.JsonData.MarshalJSON()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &backend.DataSourceInstanceSettings{
|
||||
ID: ds.Id,
|
||||
Name: ds.Name,
|
||||
URL: ds.Url,
|
||||
Database: ds.Database,
|
||||
User: ds.User,
|
||||
BasicAuthEnabled: ds.BasicAuth,
|
||||
BasicAuthUser: ds.BasicAuthUser,
|
||||
JSONData: jsonDataBytes,
|
||||
DecryptedSecureJSONData: ds.DecryptedValues(),
|
||||
Updated: ds.Updated,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// nolint:staticcheck // plugins.DataQuery deprecated
|
||||
func (a *queryEndpointAdapter) DataQuery(ctx context.Context, ds *models.DataSource, query plugins.DataQuery) (
|
||||
plugins.DataResponse, error) {
|
||||
instanceSettings, err := modelToInstanceSettings(ds)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
}
|
||||
|
||||
req := &backend.QueryDataRequest{
|
||||
PluginContext: backend.PluginContext{
|
||||
OrgID: ds.OrgId,
|
||||
PluginID: a.pluginID,
|
||||
User: adapters.BackendUserFromSignedInUser(query.User),
|
||||
DataSourceInstanceSettings: instanceSettings,
|
||||
},
|
||||
Queries: []backend.DataQuery{},
|
||||
Headers: query.Headers,
|
||||
}
|
||||
|
||||
for _, q := range query.Queries {
|
||||
modelJSON, err := q.Model.MarshalJSON()
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
}
|
||||
req.Queries = append(req.Queries, backend.DataQuery{
|
||||
RefID: q.RefID,
|
||||
Interval: time.Duration(q.IntervalMS) * time.Millisecond,
|
||||
MaxDataPoints: q.MaxDataPoints,
|
||||
TimeRange: backend.TimeRange{
|
||||
From: query.TimeRange.GetFromAsTimeUTC(),
|
||||
To: query.TimeRange.GetToAsTimeUTC(),
|
||||
},
|
||||
QueryType: q.QueryType,
|
||||
JSON: modelJSON,
|
||||
})
|
||||
}
|
||||
|
||||
resp, err := a.handler.QueryData(ctx, req)
|
||||
if err != nil {
|
||||
return plugins.DataResponse{}, err
|
||||
}
|
||||
|
||||
tR := plugins.DataResponse{
|
||||
Results: make(map[string]plugins.DataQueryResult, len(resp.Responses)),
|
||||
}
|
||||
|
||||
for refID, r := range resp.Responses {
|
||||
qr := plugins.DataQueryResult{
|
||||
RefID: refID,
|
||||
}
|
||||
|
||||
for _, f := range r.Frames {
|
||||
if f.RefID == "" {
|
||||
f.RefID = refID
|
||||
}
|
||||
}
|
||||
|
||||
qr.Dataframes = plugins.NewDecodedDataFrames(r.Frames)
|
||||
|
||||
if r.Error != nil {
|
||||
qr.Error = r.Error
|
||||
}
|
||||
|
||||
tR.Results[refID] = qr
|
||||
}
|
||||
|
||||
return tR, nil
|
||||
}
|
||||
@@ -38,13 +38,8 @@ func newClientConfig(executablePath string, env []string, logger log.Logger,
|
||||
}
|
||||
}
|
||||
|
||||
// StartFunc callback function called when a plugin with current plugin protocol version is started.
|
||||
type StartFunc func(pluginID string, client *Client, logger log.Logger) error
|
||||
|
||||
// PluginStartFuncs functions called for plugin when started.
|
||||
type PluginStartFuncs struct {
|
||||
OnStart StartFunc
|
||||
}
|
||||
// StartRendererFunc callback function called when a renderer plugin is started.
|
||||
type StartRendererFunc func(pluginID string, renderer pluginextensionv2.RendererPlugin, logger log.Logger) error
|
||||
|
||||
// PluginDescriptor is a descriptor used for registering backend plugins.
|
||||
type PluginDescriptor struct {
|
||||
@@ -52,7 +47,7 @@ type PluginDescriptor struct {
|
||||
executablePath string
|
||||
managed bool
|
||||
versionedPlugins map[int]goplugin.PluginSet
|
||||
startFns PluginStartFuncs
|
||||
startRendererFn StartRendererFunc
|
||||
}
|
||||
|
||||
// getV2PluginSet returns list of plugins supported on v2.
|
||||
@@ -67,7 +62,7 @@ func getV2PluginSet() goplugin.PluginSet {
|
||||
}
|
||||
|
||||
// NewBackendPlugin creates a new backend plugin factory used for registering a backend plugin.
|
||||
func NewBackendPlugin(pluginID, executablePath string, startFns PluginStartFuncs) backendplugin.PluginFactoryFunc {
|
||||
func NewBackendPlugin(pluginID, executablePath string) backendplugin.PluginFactoryFunc {
|
||||
return newPlugin(PluginDescriptor{
|
||||
pluginID: pluginID,
|
||||
executablePath: executablePath,
|
||||
@@ -75,12 +70,11 @@ func NewBackendPlugin(pluginID, executablePath string, startFns PluginStartFuncs
|
||||
versionedPlugins: map[int]goplugin.PluginSet{
|
||||
grpcplugin.ProtocolVersion: getV2PluginSet(),
|
||||
},
|
||||
startFns: startFns,
|
||||
})
|
||||
}
|
||||
|
||||
// NewRendererPlugin creates a new renderer plugin factory used for registering a backend renderer plugin.
|
||||
func NewRendererPlugin(pluginID, executablePath string, startFns PluginStartFuncs) backendplugin.PluginFactoryFunc {
|
||||
func NewRendererPlugin(pluginID, executablePath string, startFn StartRendererFunc) backendplugin.PluginFactoryFunc {
|
||||
return newPlugin(PluginDescriptor{
|
||||
pluginID: pluginID,
|
||||
executablePath: executablePath,
|
||||
@@ -88,13 +82,6 @@ func NewRendererPlugin(pluginID, executablePath string, startFns PluginStartFunc
|
||||
versionedPlugins: map[int]goplugin.PluginSet{
|
||||
grpcplugin.ProtocolVersion: getV2PluginSet(),
|
||||
},
|
||||
startFns: startFns,
|
||||
startRendererFn: startFn,
|
||||
})
|
||||
}
|
||||
|
||||
// Client client for communicating with a plugin using the current (v2) plugin protocol.
|
||||
type Client struct {
|
||||
DataPlugin grpcplugin.DataClient
|
||||
RendererPlugin pluginextensionv2.RendererPlugin
|
||||
StreamClient grpcplugin.StreamClient
|
||||
}
|
||||
|
||||
@@ -11,11 +11,9 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/pluginextensionv2"
|
||||
"github.com/grafana/grafana/pkg/util/errutil"
|
||||
"github.com/hashicorp/go-plugin"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@@ -69,7 +67,7 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi
|
||||
|
||||
if rawData != nil {
|
||||
if dataClient, ok := rawData.(grpcplugin.DataClient); ok {
|
||||
c.DataClient = instrumentDataClient(dataClient)
|
||||
c.DataClient = dataClient
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,13 +83,8 @@ func newClientV2(descriptor PluginDescriptor, logger log.Logger, rpcClient plugi
|
||||
}
|
||||
}
|
||||
|
||||
if descriptor.startFns.OnStart != nil {
|
||||
client := &Client{
|
||||
DataPlugin: c.DataClient,
|
||||
RendererPlugin: c.RendererPlugin,
|
||||
StreamClient: c.StreamClient,
|
||||
}
|
||||
if err := descriptor.startFns.OnStart(descriptor.pluginID, client, logger); err != nil {
|
||||
if descriptor.startRendererFn != nil {
|
||||
if err := descriptor.startRendererFn(descriptor.pluginID, c.RendererPlugin, logger); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -137,6 +130,25 @@ func (c *clientV2) CheckHealth(ctx context.Context, req *backend.CheckHealthRequ
|
||||
return backend.FromProto().CheckHealthResponse(protoResp), nil
|
||||
}
|
||||
|
||||
func (c *clientV2) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
if c.DataClient == nil {
|
||||
return nil, backendplugin.ErrMethodNotImplemented
|
||||
}
|
||||
|
||||
protoReq := backend.ToProto().QueryDataRequest(req)
|
||||
protoResp, err := c.DataClient.QueryData(ctx, protoReq)
|
||||
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.Unimplemented {
|
||||
return nil, backendplugin.ErrMethodNotImplemented
|
||||
}
|
||||
|
||||
return nil, errutil.Wrap("Failed to query data", err)
|
||||
}
|
||||
|
||||
return backend.FromProto().QueryDataResponse(protoResp)
|
||||
}
|
||||
|
||||
func (c *clientV2) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
if c.ResourceClient == nil {
|
||||
return backendplugin.ErrMethodNotImplemented
|
||||
@@ -226,24 +238,3 @@ func (c *clientV2) RunStream(ctx context.Context, req *backend.RunStreamRequest,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type dataClientQueryDataFunc func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error)
|
||||
|
||||
func (fn dataClientQueryDataFunc) QueryData(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
|
||||
return fn(ctx, req, opts...)
|
||||
}
|
||||
|
||||
func instrumentDataClient(plugin grpcplugin.DataClient) grpcplugin.DataClient {
|
||||
if plugin == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return dataClientQueryDataFunc(func(ctx context.Context, req *pluginv2.QueryDataRequest, opts ...grpc.CallOption) (*pluginv2.QueryDataResponse, error) {
|
||||
var resp *pluginv2.QueryDataResponse
|
||||
err := instrumentation.InstrumentQueryDataRequest(req.PluginContext.PluginId, func() (innerErr error) {
|
||||
resp, innerErr = plugin.QueryData(ctx, req)
|
||||
return
|
||||
})
|
||||
return resp, err
|
||||
})
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
type pluginClient interface {
|
||||
backend.CollectMetricsHandler
|
||||
backend.CheckHealthHandler
|
||||
backend.QueryDataHandler
|
||||
backend.CallResourceHandler
|
||||
backend.StreamHandler
|
||||
}
|
||||
@@ -137,6 +138,15 @@ func (p *grpcPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthRe
|
||||
return pluginClient.CheckHealth(ctx, req)
|
||||
}
|
||||
|
||||
func (p *grpcPlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
pluginClient, ok := p.getPluginClient()
|
||||
if !ok {
|
||||
return nil, backendplugin.ErrPluginUnavailable
|
||||
}
|
||||
|
||||
return pluginClient.QueryData(ctx, req)
|
||||
}
|
||||
|
||||
func (p *grpcPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
pluginClient, ok := p.getPluginClient()
|
||||
if !ok {
|
||||
|
||||
@@ -24,13 +24,12 @@ type Manager interface {
|
||||
CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error)
|
||||
// CheckHealth checks the health of a registered backend plugin.
|
||||
CheckHealth(ctx context.Context, pCtx backend.PluginContext) (*backend.CheckHealthResult, error)
|
||||
// QueryData query data from a registered backend plugin.
|
||||
QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error)
|
||||
// CallResource calls a plugin resource.
|
||||
CallResource(pluginConfig backend.PluginContext, ctx *models.ReqContext, path string)
|
||||
// Get plugin by its ID.
|
||||
Get(pluginID string) (Plugin, bool)
|
||||
// GetDataPlugin gets a DataPlugin with a certain ID or nil if it doesn't exist.
|
||||
// TODO: interface{} is the return type in order to break a dependency cycle. Should be plugins.DataPlugin.
|
||||
GetDataPlugin(pluginID string) interface{}
|
||||
}
|
||||
|
||||
// Plugin is the backend plugin interface.
|
||||
@@ -45,6 +44,7 @@ type Plugin interface {
|
||||
IsDecommissioned() bool
|
||||
backend.CollectMetricsHandler
|
||||
backend.CheckHealthHandler
|
||||
backend.QueryDataHandler
|
||||
backend.CallResourceHandler
|
||||
backend.StreamHandler
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
|
||||
"github.com/grafana/grafana/pkg/registry"
|
||||
@@ -151,6 +150,10 @@ func (m *manager) Get(pluginID string) (backendplugin.Plugin, bool) {
|
||||
p, ok := m.plugins[pluginID]
|
||||
m.pluginsMu.RUnlock()
|
||||
|
||||
if ok && p.IsDecommissioned() {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return p, ok
|
||||
}
|
||||
|
||||
@@ -181,21 +184,6 @@ func (m *manager) getAzureEnvironmentVariables() []string {
|
||||
return variables
|
||||
}
|
||||
|
||||
//nolint: staticcheck // plugins.DataPlugin deprecated
|
||||
func (m *manager) GetDataPlugin(pluginID string) interface{} {
|
||||
p, _ := m.Get(pluginID)
|
||||
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if dataPlugin, ok := p.(plugins.DataPlugin); ok {
|
||||
return dataPlugin
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// start starts a managed backend plugin
|
||||
func (m *manager) start(ctx context.Context, p backendplugin.Plugin) {
|
||||
if !p.IsManaged() {
|
||||
@@ -244,10 +232,7 @@ func (m *manager) stop(ctx context.Context) {
|
||||
|
||||
// CollectMetrics collects metrics from a registered backend plugin.
|
||||
func (m *manager) CollectMetrics(ctx context.Context, pluginID string) (*backend.CollectMetricsResult, error) {
|
||||
m.pluginsMu.RLock()
|
||||
p, registered := m.plugins[pluginID]
|
||||
m.pluginsMu.RUnlock()
|
||||
|
||||
p, registered := m.Get(pluginID)
|
||||
if !registered {
|
||||
return nil, backendplugin.ErrPluginNotRegistered
|
||||
}
|
||||
@@ -279,10 +264,7 @@ func (m *manager) CheckHealth(ctx context.Context, pluginContext backend.PluginC
|
||||
}, nil
|
||||
}
|
||||
|
||||
m.pluginsMu.RLock()
|
||||
p, registered := m.plugins[pluginContext.PluginID]
|
||||
m.pluginsMu.RUnlock()
|
||||
|
||||
p, registered := m.Get(pluginContext.PluginID)
|
||||
if !registered {
|
||||
return nil, backendplugin.ErrPluginNotRegistered
|
||||
}
|
||||
@@ -308,15 +290,39 @@ func (m *manager) CheckHealth(ctx context.Context, pluginContext backend.PluginC
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *manager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
p, registered := m.Get(req.PluginContext.PluginID)
|
||||
if !registered {
|
||||
return nil, backendplugin.ErrPluginNotRegistered
|
||||
}
|
||||
|
||||
var resp *backend.QueryDataResponse
|
||||
err := instrumentation.InstrumentQueryDataRequest(p.PluginID(), func() (innerErr error) {
|
||||
resp, innerErr = p.QueryData(ctx, req)
|
||||
return
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if errors.Is(err, backendplugin.ErrPluginUnavailable) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, errutil.Wrap("failed to query data", err)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type keepCookiesJSONModel struct {
|
||||
KeepCookies []string `json:"keepCookies"`
|
||||
}
|
||||
|
||||
func (m *manager) callResourceInternal(w http.ResponseWriter, req *http.Request, pCtx backend.PluginContext) error {
|
||||
m.pluginsMu.RLock()
|
||||
p, registered := m.plugins[pCtx.PluginID]
|
||||
m.pluginsMu.RUnlock()
|
||||
|
||||
p, registered := m.Get(pCtx.PluginID)
|
||||
if !registered {
|
||||
return backendplugin.ErrPluginNotRegistered
|
||||
}
|
||||
|
||||
@@ -367,6 +367,7 @@ type testPlugin struct {
|
||||
decommissioned bool
|
||||
backend.CollectMetricsHandlerFunc
|
||||
backend.CheckHealthHandlerFunc
|
||||
backend.QueryDataHandlerFunc
|
||||
backend.CallResourceHandlerFunc
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
@@ -441,6 +442,14 @@ func (tp *testPlugin) CheckHealth(ctx context.Context, req *backend.CheckHealthR
|
||||
return nil, backendplugin.ErrMethodNotImplemented
|
||||
}
|
||||
|
||||
func (tp *testPlugin) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
if tp.QueryDataHandlerFunc != nil {
|
||||
return tp.QueryDataHandlerFunc(ctx, req)
|
||||
}
|
||||
|
||||
return nil, backendplugin.ErrMethodNotImplemented
|
||||
}
|
||||
|
||||
func (tp *testPlugin) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
if tp.CallResourceHandlerFunc != nil {
|
||||
return tp.CallResourceHandlerFunc(ctx, req, sender)
|
||||
|
||||
Reference in New Issue
Block a user