Plugins: Use grafana-plugin-sdk-go v0.5.0 (#21116)

* fix dispense

* plugin loading refactor

Moves common functionality of registering, starting,
stopping and restarting backend plugins to
backendplugin package.

* simplify plugin descriptors

* target sdk v0.5.0

Co-authored-by: Kyle Brandt <kyle@kbrandt.com>
This commit is contained in:
Marcus Efraimsson
2020-01-08 17:43:28 +01:00
committed by Kyle Brandt
parent 5321e7536f
commit baba1634b8
306 changed files with 81274 additions and 41083 deletions

View File

@@ -7,9 +7,8 @@ import (
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
rendererV1 "github.com/grafana/grafana-plugin-model/go/renderer"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
sdk "github.com/grafana/grafana-plugin-sdk-go/common"
datasourceV2 "github.com/grafana/grafana-plugin-sdk-go/datasource"
transformV2 "github.com/grafana/grafana-plugin-sdk-go/transform"
"github.com/hashicorp/go-plugin"
)
@@ -32,9 +31,7 @@ var handshake = plugin.HandshakeConfig{
MagicCookieValue: sdk.MagicCookieValue,
}
// NewClientConfig returns a configuration object that can be used to instantiate
// a client for the plugin described by the given metadata.
func NewClientConfig(executablePath string, logger log.Logger, versionedPlugins map[int]plugin.PluginSet) *plugin.ClientConfig {
func newClientConfig(executablePath string, logger log.Logger, versionedPlugins map[int]plugin.PluginSet) *plugin.ClientConfig {
return &plugin.ClientConfig{
Cmd: exec.Command(executablePath),
HandshakeConfig: handshake,
@@ -44,38 +41,44 @@ func NewClientConfig(executablePath string, logger log.Logger, versionedPlugins
}
}
// NewDatasourceClient returns a datasource plugin client.
func NewDatasourceClient(pluginID, executablePath string, logger log.Logger) *plugin.Client {
versionedPlugins := map[int]plugin.PluginSet{
1: {
pluginID: &datasourceV1.DatasourcePluginImpl{},
},
2: {
pluginID: &datasourceV2.DatasourcePluginImpl{},
},
}
return plugin.NewClient(NewClientConfig(executablePath, logger, versionedPlugins))
// PluginDescriptor descriptor used for registering backend plugins.
type PluginDescriptor struct {
pluginID string
executablePath string
managed bool
versionedPlugins map[int]plugin.PluginSet
}
// NewRendererClient returns a renderer plugin client.
func NewRendererClient(pluginID, executablePath string, logger log.Logger) *plugin.Client {
versionedPlugins := map[int]plugin.PluginSet{
1: {
pluginID: &rendererV1.RendererPluginImpl{},
// NewBackendPluginDescriptor creates a new backend plugin descriptor
// used for registering a backend datasource plugin.
func NewBackendPluginDescriptor(pluginID, executablePath string) PluginDescriptor {
return PluginDescriptor{
pluginID: pluginID,
executablePath: executablePath,
managed: true,
versionedPlugins: map[int]plugin.PluginSet{
DefaultProtocolVersion: {
pluginID: &datasourceV1.DatasourcePluginImpl{},
},
sdk.ProtocolVersion: {
"backend": &backend.CoreGRPCPlugin{},
"transform": &backend.TransformGRPCPlugin{},
},
},
}
return plugin.NewClient(NewClientConfig(executablePath, logger, versionedPlugins))
}
// NewTransformClient returns a transform plugin client.
func NewTransformClient(pluginID, executablePath string, logger log.Logger) *plugin.Client {
versionedPlugins := map[int]plugin.PluginSet{
2: {
pluginID: &transformV2.TransformPluginImpl{},
// NewRendererPluginDescriptor creates a new renderer plugin descriptor
// used for registering a backend renderer plugin.
func NewRendererPluginDescriptor(pluginID, executablePath string) PluginDescriptor {
return PluginDescriptor{
pluginID: pluginID,
executablePath: executablePath,
managed: false,
versionedPlugins: map[int]plugin.PluginSet{
DefaultProtocolVersion: {
pluginID: &rendererV1.RendererPluginImpl{},
},
},
}
return plugin.NewClient(NewClientConfig(executablePath, logger, versionedPlugins))
}

View File

@@ -0,0 +1,197 @@
package backendplugin
import (
"context"
"errors"
"sync"
"time"
"github.com/grafana/grafana/pkg/infra/log"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
var (
pluginsMu sync.RWMutex
plugins = make(map[string]*BackendPlugin)
logger = log.New("plugins.backend")
)
type BackendPluginCallbackFunc func(pluginID string, client *plugin.Client, logger log.Logger) error
type BackendPlugin struct {
id string
executablePath string
managed bool
clientFactory func() *plugin.Client
client *plugin.Client
logger log.Logger
callbackFn BackendPluginCallbackFunc
supportsMetrics bool
supportsHealth bool
}
func (p *BackendPlugin) start(ctx context.Context) error {
p.client = p.clientFactory()
// rpcClient, err := p.client.Client()
// if err != nil {
// return err
// }
// if p.client.NegotiatedVersion() > 1 {
// _, err = rpcClient.Dispense("diagnostics")
// if err != nil {
// return err
// }
// }
if p.callbackFn != nil {
return p.callbackFn(p.id, p.client, p.logger)
}
return nil
}
func (p *BackendPlugin) stop() error {
if p.client != nil {
p.client.Kill()
}
return nil
}
func (p *BackendPlugin) collectMetrics(ctx context.Context) {
if !p.supportsMetrics {
return
}
}
func (p *BackendPlugin) checkHealth(ctx context.Context) {
if !p.supportsHealth {
return
}
}
// Register registers a backend plugin
func Register(descriptor PluginDescriptor, callbackFn BackendPluginCallbackFunc) error {
logger.Debug("Registering backend plugin", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
pluginsMu.Lock()
defer pluginsMu.Unlock()
if _, exists := plugins[descriptor.pluginID]; exists {
return errors.New("Backend plugin already registered")
}
pluginLogger := logger.New("pluginId", descriptor.pluginID)
plugin := &BackendPlugin{
id: descriptor.pluginID,
executablePath: descriptor.executablePath,
managed: descriptor.managed,
clientFactory: func() *plugin.Client {
return plugin.NewClient(newClientConfig(descriptor.executablePath, pluginLogger, descriptor.versionedPlugins))
},
callbackFn: callbackFn,
logger: pluginLogger,
}
plugins[descriptor.pluginID] = plugin
logger.Debug("Backend plugin registered", "pluginId", descriptor.pluginID, "executablePath", descriptor.executablePath)
return nil
}
// Start starts all managed backend plugins
func Start(ctx context.Context) {
pluginsMu.RLock()
defer pluginsMu.RUnlock()
for _, p := range plugins {
if !p.managed {
continue
}
if err := startPluginAndRestartKilledProcesses(ctx, p); err != nil {
p.logger.Error("Failed to start plugin", "error", err)
}
}
}
// StartPlugin starts a non-managed backend plugin
func StartPlugin(ctx context.Context, pluginID string) error {
pluginsMu.RLock()
p, registered := plugins[pluginID]
pluginsMu.RUnlock()
if !registered {
return errors.New("Backend plugin not registered")
}
if p.managed {
return errors.New("Backend plugin is managed and cannot be manually started")
}
return startPluginAndRestartKilledProcesses(ctx, p)
}
func startPluginAndRestartKilledProcesses(ctx context.Context, p *BackendPlugin) error {
if err := p.start(ctx); err != nil {
return err
}
go func(ctx context.Context, p *BackendPlugin) {
if err := restartKilledProcess(ctx, p); err != nil {
p.logger.Error("Attempt to restart killed plugin process failed", "error", err)
}
}(ctx, p)
return nil
}
// Stop stops all managed backend plugins
func Stop() {
pluginsMu.RLock()
defer pluginsMu.RUnlock()
for _, p := range plugins {
go func(p *BackendPlugin) {
p.logger.Debug("Stopping plugin")
if err := p.stop(); err != nil {
p.logger.Error("Failed to stop plugin", "error", err)
}
p.logger.Debug("Plugin stopped")
}(p)
}
}
// CollectMetrics collect metrics from backend plugins
func CollectMetrics(ctx context.Context) {
for _, p := range plugins {
p.collectMetrics(ctx)
}
}
// CheckHealth checks health of backend plugins
func CheckHealth(ctx context.Context) {
for _, p := range plugins {
p.checkHealth(ctx)
}
}
func restartKilledProcess(ctx context.Context, p *BackendPlugin) error {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !xerrors.Is(err, context.Canceled) {
return err
}
return nil
case <-ticker.C:
if !p.client.Exited() {
continue
}
p.logger.Debug("Restarting plugin")
if err := p.start(ctx); err != nil {
p.logger.Error("Failed to restart plugin", "error", err)
continue
}
p.logger.Debug("Plugin restarted")
}
}
}

View File

@@ -2,9 +2,8 @@ package wrapper
import (
"context"
"errors"
sdk "github.com/grafana/grafana-plugin-sdk-go/datasource"
sdk "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
@@ -12,12 +11,12 @@ import (
"github.com/grafana/grafana/pkg/tsdb"
)
func NewDatasourcePluginWrapperV2(log log.Logger, plugin sdk.DatasourcePlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{DatasourcePlugin: plugin, logger: log}
func NewDatasourcePluginWrapperV2(log log.Logger, plugin sdk.BackendPlugin) *DatasourcePluginWrapperV2 {
return &DatasourcePluginWrapperV2{BackendPlugin: plugin, logger: log}
}
type DatasourcePluginWrapperV2 struct {
sdk.DatasourcePlugin
sdk.BackendPlugin
logger log.Logger
}
@@ -27,8 +26,8 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
return nil, err
}
pbQuery := &pluginv2.DatasourceRequest{
Datasource: &pluginv2.DatasourceInfo{
pbQuery := &pluginv2.DataQueryRequest{
Config: &pluginv2.PluginConfig{
Name: ds.Name,
Type: ds.Type,
Url: ds.Url,
@@ -37,13 +36,7 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
JsonData: string(jsonData),
DecryptedSecureJsonData: ds.SecureJsonData.Decrypt(),
},
TimeRange: &pluginv2.TimeRange{
FromRaw: query.TimeRange.From,
ToRaw: query.TimeRange.To,
ToEpochMs: query.TimeRange.GetToAsMsEpoch(),
FromEpochMs: query.TimeRange.GetFromAsMsEpoch(),
},
Queries: []*pluginv2.DatasourceQuery{},
Queries: []*pluginv2.DataQuery{},
}
for _, q := range query.Queries {
@@ -51,45 +44,29 @@ func (tw *DatasourcePluginWrapperV2) Query(ctx context.Context, ds *models.DataS
if err != nil {
return nil, err
}
pbQuery.Queries = append(pbQuery.Queries, &pluginv2.DatasourceQuery{
ModelJson: string(modelJSON),
IntervalMs: q.IntervalMs,
pbQuery.Queries = append(pbQuery.Queries, &pluginv2.DataQuery{
Json: modelJSON,
IntervalMS: q.IntervalMs,
RefId: q.RefId,
MaxDataPoints: q.MaxDataPoints,
TimeRange: &pluginv2.TimeRange{
ToEpochMS: query.TimeRange.GetToAsMsEpoch(),
FromEpochMS: query.TimeRange.GetFromAsMsEpoch(),
},
})
}
pbres, err := tw.DatasourcePlugin.Query(ctx, pbQuery)
pbRes, err := tw.BackendPlugin.DataQuery(ctx, pbQuery)
if err != nil {
return nil, err
}
res := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{},
}
for _, r := range pbres.Results {
qr := &tsdb.QueryResult{
RefId: r.RefId,
}
if r.Error != "" {
qr.Error = errors.New(r.Error)
qr.ErrorString = r.Error
}
if r.MetaJson != "" {
metaJSON, err := simplejson.NewJson([]byte(r.MetaJson))
if err != nil {
tw.logger.Error("Error parsing JSON Meta field: " + err.Error())
}
qr.Meta = metaJSON
}
qr.Dataframes = r.Dataframes
res.Results[r.RefId] = qr
}
return res, nil
return &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"": {
Dataframes: pbRes.Frames,
Meta: simplejson.NewFromAny(pbRes.Metadata),
},
},
}, nil
}

View File

@@ -1,12 +1,10 @@
package plugins
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"time"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
@@ -14,13 +12,12 @@ import (
"github.com/grafana/grafana/pkg/util/errutil"
datasourceV1 "github.com/grafana/grafana-plugin-model/go/datasource"
sdk "github.com/grafana/grafana-plugin-sdk-go/datasource"
sdk "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/datasource/wrapper"
"github.com/grafana/grafana/pkg/tsdb"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
// DataSourcePlugin contains all metadata about a datasource plugin
@@ -41,9 +38,6 @@ type DataSourcePlugin struct {
Backend bool `json:"backend,omitempty"`
Executable string `json:"executable,omitempty"`
SDK bool `json:"sdk,omitempty"`
log log.Logger
client *plugin.Client
}
func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string) error {
@@ -59,23 +53,16 @@ func (p *DataSourcePlugin) Load(decoder *json.Decoder, pluginDir string) error {
return errutil.Wrapf(err, "Failed to register plugin")
}
DataSources[p.Id] = p
return nil
}
func (p *DataSourcePlugin) startBackendPlugin(ctx context.Context, log log.Logger) error {
p.log = log.New("plugin-id", p.Id)
if err := p.spawnSubProcess(); err != nil {
return err
if p.Backend {
cmd := ComposePluginStartCommmand(p.Executable)
fullpath := path.Join(p.PluginDir, cmd)
descriptor := backendplugin.NewBackendPluginDescriptor(p.Id, fullpath)
if err := backendplugin.Register(descriptor, p.onPluginStart); err != nil {
return errutil.Wrapf(err, "Failed to register backend plugin")
}
}
go func() {
if err := p.restartKilledProcess(ctx); err != nil {
p.log.Error("Attempting to restart killed process failed", "err", err)
}
}()
DataSources[p.Id] = p
return nil
}
@@ -83,71 +70,37 @@ func (p *DataSourcePlugin) isVersionOne() bool {
return !p.SDK
}
func (p *DataSourcePlugin) spawnSubProcess() error {
cmd := ComposePluginStartCommmand(p.Executable)
fullpath := path.Join(p.PluginDir, cmd)
p.client = backendplugin.NewDatasourceClient(p.Id, fullpath, p.log)
rpcClient, err := p.client.Client()
func (p *DataSourcePlugin) onPluginStart(pluginID string, client *plugin.Client, logger log.Logger) error {
rpcClient, err := client.Client()
if err != nil {
return err
}
raw, err := rpcClient.Dispense(p.Id)
if err != nil {
return err
}
if p.client.NegotiatedVersion() == 1 {
if client.NegotiatedVersion() == 1 {
raw, err := rpcClient.Dispense(pluginID)
if err != nil {
return err
}
plugin := raw.(datasourceV1.DatasourcePlugin)
tsdb.RegisterTsdbQueryEndpoint(p.Id, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapper(p.log, plugin), nil
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapper(logger, plugin), nil
})
return nil
}
plugin, ok := raw.(sdk.DatasourcePlugin)
raw, err := rpcClient.Dispense("backend")
if err != nil {
return err
}
plugin, ok := raw.(sdk.BackendPlugin)
if !ok {
return fmt.Errorf("unxpected type %T, expeced sdk.DatasourcePlugin", raw)
return fmt.Errorf("unexpected type %T, expected sdk.Plugin", raw)
}
tsdb.RegisterTsdbQueryEndpoint(p.Id, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(p.log, plugin), nil
tsdb.RegisterTsdbQueryEndpoint(pluginID, func(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return wrapper.NewDatasourcePluginWrapperV2(logger, plugin), nil
})
return nil
}
func (p *DataSourcePlugin) restartKilledProcess(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !xerrors.Is(err, context.Canceled) {
return err
}
return nil
case <-ticker.C:
if !p.client.Exited() {
continue
}
if err := p.spawnSubProcess(); err != nil {
p.log.Error("Failed to restart plugin", "err", err)
continue
}
p.log.Debug("Plugin process restarted")
}
}
}
func (p *DataSourcePlugin) Kill() {
if p.client != nil {
p.log.Debug("Killing subprocess ", "name", p.Name)
p.client.Kill()
}
}

View File

@@ -14,6 +14,7 @@ import (
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
@@ -110,25 +111,8 @@ func (pm *PluginManager) Init() error {
return nil
}
func (pm *PluginManager) startBackendPlugins(ctx context.Context) {
for _, ds := range DataSources {
if !ds.Backend {
continue
}
if err := ds.startBackendPlugin(ctx, plog); err != nil {
pm.log.Error("Failed to init plugin.", "error", err, "plugin", ds.Id)
}
}
if Transform != nil {
if err := Transform.startBackendPlugin(ctx, plog); err != nil {
pm.log.Error("Failed to init plugin.", "error", err, "plugin", Transform.Id)
}
}
}
func (pm *PluginManager) Run(ctx context.Context) error {
pm.startBackendPlugins(ctx)
backendplugin.Start(ctx)
pm.updateAppDashboards()
pm.checkForUpdates()
@@ -144,14 +128,7 @@ func (pm *PluginManager) Run(ctx context.Context) error {
}
}
// kill backend plugins
for _, p := range DataSources {
p.Kill()
}
if Transform != nil {
Transform.Kill()
}
backendplugin.Stop()
return ctx.Err()
}

View File

@@ -1,11 +1,22 @@
package plugins
import "encoding/json"
import (
"context"
"encoding/json"
"path"
pluginModel "github.com/grafana/grafana-plugin-model/go/renderer"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/util/errutil"
plugin "github.com/hashicorp/go-plugin"
)
type RendererPlugin struct {
PluginBase
Executable string `json:"executable,omitempty"`
GrpcPlugin pluginModel.RendererPlugin
}
func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string) error {
@@ -17,6 +28,36 @@ func (r *RendererPlugin) Load(decoder *json.Decoder, pluginDir string) error {
return err
}
cmd := ComposePluginStartCommmand("plugin_start")
fullpath := path.Join(r.PluginDir, cmd)
descriptor := backendplugin.NewRendererPluginDescriptor(r.Id, fullpath)
if err := backendplugin.Register(descriptor, r.onPluginStart); err != nil {
return errutil.Wrapf(err, "Failed to register backend plugin")
}
Renderer = r
return nil
}
func (r *RendererPlugin) Start(ctx context.Context) error {
if err := backendplugin.StartPlugin(ctx, r.Id); err != nil {
return errutil.Wrapf(err, "Failed to start renderer plugin")
}
return nil
}
func (r *RendererPlugin) onPluginStart(pluginID string, client *plugin.Client, logger log.Logger) error {
rpcClient, err := client.Client()
if err != nil {
return err
}
raw, err := rpcClient.Dispense(pluginID)
if err != nil {
return err
}
r.GrpcPlugin = raw.(pluginModel.RendererPlugin)
return nil
}

View File

@@ -3,34 +3,29 @@ package plugins
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"time"
"strconv"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/dataframe"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana-plugin-sdk-go/transform"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/util/errutil"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
type TransformPlugin struct {
PluginBase
// TODO we probably want a Backend Plugin Base? Or some way to dedup proc management code
Executable string `json:"executable,omitempty"`
*TransformWrapper
client *plugin.Client
log log.Logger
}
func (tp *TransformPlugin) Load(decoder *json.Decoder, pluginDir string) error {
@@ -42,108 +37,57 @@ func (tp *TransformPlugin) Load(decoder *json.Decoder, pluginDir string) error {
return err
}
cmd := ComposePluginStartCommmand(tp.Executable)
fullpath := path.Join(tp.PluginDir, cmd)
descriptor := backendplugin.NewBackendPluginDescriptor(tp.Id, fullpath)
if err := backendplugin.Register(descriptor, tp.onPluginStart); err != nil {
return errutil.Wrapf(err, "Failed to register backend plugin")
}
Transform = tp
return nil
}
func (p *TransformPlugin) startBackendPlugin(ctx context.Context, log log.Logger) error {
p.log = log.New("plugin-id", p.Id)
if err := p.spawnSubProcess(); err != nil {
return err
}
go func() {
if err := p.restartKilledProcess(ctx); err != nil {
p.log.Error("Attempting to restart killed process failed", "err", err)
}
}()
return nil
}
func (p *TransformPlugin) spawnSubProcess() error {
cmd := ComposePluginStartCommmand(p.Executable)
fullpath := path.Join(p.PluginDir, cmd)
p.client = backendplugin.NewTransformClient(p.Id, fullpath, p.log)
rpcClient, err := p.client.Client()
func (p *TransformPlugin) onPluginStart(pluginID string, client *plugin.Client, logger log.Logger) error {
rpcClient, err := client.Client()
if err != nil {
return err
}
raw, err := rpcClient.Dispense(p.Id)
raw, err := rpcClient.Dispense("transform")
if err != nil {
return err
}
plugin, ok := raw.(transform.TransformPlugin)
plugin, ok := raw.(backend.TransformPlugin)
if !ok {
return fmt.Errorf("unexpected type %T, expected *transform.GRPCClient", raw)
return fmt.Errorf("unexpected type %T, expected *backend.TransformPlugin", raw)
}
p.TransformWrapper = NewTransformWrapper(p.log, plugin)
p.TransformWrapper = NewTransformWrapper(logger, plugin)
return nil
}
func (p *TransformPlugin) restartKilledProcess(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !xerrors.Is(err, context.Canceled) {
return err
}
return nil
case <-ticker.C:
if !p.client.Exited() {
continue
}
if err := p.spawnSubProcess(); err != nil {
p.log.Error("Failed to restart plugin", "err", err)
continue
}
p.log.Debug("Plugin process restarted")
}
}
}
func (p *TransformPlugin) Kill() {
if p.client != nil {
p.log.Debug("Killing subprocess ", "name", p.Name)
p.client.Kill()
}
}
// ...
// Wrapper Code
// ...
func NewTransformWrapper(log log.Logger, plugin transform.TransformPlugin) *TransformWrapper {
return &TransformWrapper{plugin, log, &grafanaAPI{log}}
func NewTransformWrapper(log log.Logger, plugin backend.TransformPlugin) *TransformWrapper {
return &TransformWrapper{plugin, log, &transformCallback{log}}
}
type TransformWrapper struct {
transform.TransformPlugin
logger log.Logger
api *grafanaAPI
backend.TransformPlugin
logger log.Logger
callback *transformCallback
}
func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
pbQuery := &pluginv2.TransformRequest{
TimeRange: &pluginv2.TimeRange{
FromRaw: query.TimeRange.From,
ToRaw: query.TimeRange.To,
ToEpochMs: query.TimeRange.GetToAsMsEpoch(),
FromEpochMs: query.TimeRange.GetFromAsMsEpoch(),
},
Queries: []*pluginv2.TransformQuery{},
pbQuery := &pluginv2.DataQueryRequest{
Config: &pluginv2.PluginConfig{},
Queries: []*pluginv2.DataQuery{},
}
for _, q := range query.Queries {
@@ -151,61 +95,44 @@ func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery
if err != nil {
return nil, err
}
pbQuery.Queries = append(pbQuery.Queries, &pluginv2.TransformQuery{
ModelJson: string(modelJSON),
IntervalMs: q.IntervalMs,
pbQuery.Queries = append(pbQuery.Queries, &pluginv2.DataQuery{
Json: modelJSON,
IntervalMS: q.IntervalMs,
RefId: q.RefId,
MaxDataPoints: q.MaxDataPoints,
TimeRange: &pluginv2.TimeRange{
ToEpochMS: query.TimeRange.GetToAsMsEpoch(),
FromEpochMS: query.TimeRange.GetFromAsMsEpoch(),
},
})
}
pbres, err := tw.TransformPlugin.Transform(ctx, pbQuery, tw.api)
pbRes, err := tw.TransformPlugin.DataQuery(ctx, pbQuery, tw.callback)
if err != nil {
return nil, err
}
res := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{},
}
for _, r := range pbres.Results {
qr := &tsdb.QueryResult{
RefId: r.RefId,
}
if r.Error != "" {
qr.Error = errors.New(r.Error)
qr.ErrorString = r.Error
}
if r.MetaJson != "" {
metaJSON, err := simplejson.NewJson([]byte(r.MetaJson))
if err != nil {
tw.logger.Error("Error parsing JSON Meta field: " + err.Error())
}
qr.Meta = metaJSON
}
qr.Dataframes = r.Dataframes
res.Results[r.RefId] = qr
}
return res, nil
return &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"": {
Dataframes: pbRes.Frames,
Meta: simplejson.NewFromAny(pbRes.Metadata),
},
},
}, nil
}
type grafanaAPI struct {
type transformCallback struct {
logger log.Logger
}
func (s *grafanaAPI) QueryDatasource(ctx context.Context, req *pluginv2.QueryDatasourceRequest) (*pluginv2.QueryDatasourceResponse, error) {
func (s *transformCallback) DataQuery(ctx context.Context, req *pluginv2.DataQueryRequest) (*pluginv2.DataQueryResponse, error) {
if len(req.Queries) == 0 {
return nil, fmt.Errorf("zero queries found in datasource request")
}
getDsInfo := &models.GetDataSourceByIdQuery{
Id: req.DatasourceId,
OrgId: req.OrgId,
Id: req.Config.Id,
OrgId: req.Config.OrgId,
}
if err := bus.Dispatch(getDsInfo); err != nil {
@@ -215,20 +142,22 @@ func (s *grafanaAPI) QueryDatasource(ctx context.Context, req *pluginv2.QueryDat
// Convert plugin-model (datasource) queries to tsdb queries
queries := make([]*tsdb.Query, len(req.Queries))
for i, query := range req.Queries {
sj, err := simplejson.NewJson([]byte(query.ModelJson))
sj, err := simplejson.NewJson(query.Json)
if err != nil {
return nil, err
}
queries[i] = &tsdb.Query{
RefId: query.RefId,
IntervalMs: query.IntervalMs,
IntervalMs: query.IntervalMS,
MaxDataPoints: query.MaxDataPoints,
DataSource: getDsInfo.Result,
Model: sj,
}
}
timeRange := tsdb.NewTimeRange(req.TimeRange.FromRaw, req.TimeRange.ToRaw)
// For now take Time Range from first query.
timeRange := tsdb.NewTimeRange(strconv.FormatInt(req.Queries[0].TimeRange.FromEpochMS, 10), strconv.FormatInt(req.Queries[0].TimeRange.ToEpochMS, 10))
tQ := &tsdb.TsdbQuery{
TimeRange: timeRange,
Queries: queries,
@@ -241,37 +170,33 @@ func (s *grafanaAPI) QueryDatasource(ctx context.Context, req *pluginv2.QueryDat
}
// Convert tsdb results (map) to plugin-model/datasource (slice) results.
// Only error, tsdb.Series, and encoded Dataframes responses are mapped.
results := make([]*pluginv2.DatasourceQueryResult, 0, len(tsdbRes.Results))
encodedFrames := [][]byte{}
for refID, res := range tsdbRes.Results {
qr := &pluginv2.DatasourceQueryResult{
RefId: refID,
}
if res.Error != nil {
qr.Error = res.ErrorString
results = append(results, qr)
// TODO add Errors property to Frame
encodedFrames = append(encodedFrames, nil)
continue
}
if res.Dataframes != nil {
qr.Dataframes = append(qr.Dataframes, res.Dataframes...)
results = append(results, qr)
encodedFrames = append(encodedFrames, res.Dataframes...)
continue
}
encodedFrames := make([][]byte, len(res.Series))
for sIdx, series := range res.Series {
for _, series := range res.Series {
frame, err := tsdb.SeriesToFrame(series)
frame.RefID = refID
if err != nil {
return nil, err
}
encodedFrames[sIdx], err = dataframe.MarshalArrow(frame)
encFrame, err := dataframe.MarshalArrow(frame)
if err != nil {
return nil, err
}
encodedFrames = append(encodedFrames, encFrame)
}
qr.Dataframes = encodedFrames
results = append(results, qr)
}
return &pluginv2.QueryDatasourceResponse{Results: results}, nil
return &pluginv2.DataQueryResponse{Frames: encodedFrames}, nil
}