mirror of
https://github.com/grafana/grafana.git
synced 2025-02-03 04:01:08 -06:00
7536647ab6
* introduce registry write/read separation * internal + external registries * fix tests * fixup * rename * move interfaces * back to plugins.Store * fix registry name * remove context.TODOs * remove some ctx for now * tidy * remove dupe logic * update naming * move from manager.go to store * amend logger name * new store writer svc * restrict changes * more simplifying * move interfaces around * remove unused * fix linter * tidy * add registry test * fix tests * revert testdata changes * revert testdata changes #1 * revert testdata changes #2 * revert testdata changes #3 * revert testdata changes #4 * revert testdata changes #5 * revert testdata changes * fixup testdata * remove unused log * update naming in test * adjust ctx in test
140 lines
3.7 KiB
Go
140 lines
3.7 KiB
Go
package manager
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
|
|
"github.com/grafana/grafana/pkg/plugins/backendplugin"
|
|
"github.com/grafana/grafana/pkg/plugins/backendplugin/instrumentation"
|
|
)
|
|
|
|
func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
|
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
var resp *backend.QueryDataResponse
|
|
err := instrumentation.InstrumentQueryDataRequest(req.PluginContext.PluginID, func() (innerErr error) {
|
|
resp, innerErr = plugin.QueryData(ctx, req)
|
|
return
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
|
|
return nil, err
|
|
}
|
|
|
|
if errors.Is(err, backendplugin.ErrPluginUnavailable) {
|
|
return nil, err
|
|
}
|
|
|
|
return nil, fmt.Errorf("%v: %w", "failed to query data", err)
|
|
}
|
|
|
|
for refID, res := range resp.Responses {
|
|
// set frame ref ID based on response ref ID
|
|
for _, f := range res.Frames {
|
|
if f.RefID == "" {
|
|
f.RefID = refID
|
|
}
|
|
}
|
|
}
|
|
|
|
return resp, err
|
|
}
|
|
|
|
func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
|
p, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return backendplugin.ErrPluginNotRegistered
|
|
}
|
|
err := instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error {
|
|
if err := p.CallResource(ctx, req, sender); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
|
p, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
var resp *backend.CollectMetricsResult
|
|
err := instrumentation.InstrumentCollectMetrics(p.PluginID(), func() (innerErr error) {
|
|
resp, innerErr = p.CollectMetrics(ctx, req)
|
|
return
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
|
p, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
var resp *backend.CheckHealthResult
|
|
err := instrumentation.InstrumentCheckHealthRequest(p.PluginID(), func() (innerErr error) {
|
|
resp, innerErr = p.CheckHealth(ctx, req)
|
|
return
|
|
})
|
|
|
|
if err != nil {
|
|
if errors.Is(err, backendplugin.ErrMethodNotImplemented) {
|
|
return nil, err
|
|
}
|
|
|
|
if errors.Is(err, backendplugin.ErrPluginUnavailable) {
|
|
return nil, err
|
|
}
|
|
|
|
return nil, fmt.Errorf("%v: %w", "failed to check plugin health", backendplugin.ErrHealthCheckFailed)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
|
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
return plugin.SubscribeStream(ctx, req)
|
|
}
|
|
|
|
func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
|
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return nil, backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
return plugin.PublishStream(ctx, req)
|
|
}
|
|
|
|
func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
|
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
|
|
if !exists {
|
|
return backendplugin.ErrPluginNotRegistered
|
|
}
|
|
|
|
return plugin.RunStream(ctx, req, sender)
|
|
}
|