grafana/pkg/plugins/transform_plugin.go

278 lines
6.4 KiB
Go

package plugins
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"time"
"github.com/grafana/grafana-plugin-sdk-go/dataframe"
"github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2"
"github.com/grafana/grafana-plugin-sdk-go/transform"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/tsdb"
plugin "github.com/hashicorp/go-plugin"
"golang.org/x/xerrors"
)
type TransformPlugin struct {
PluginBase
// TODO we probably want a Backend Plugin Base? Or some way to dedup proc management code
Executable string `json:"executable,omitempty"`
*TransformWrapper
client *plugin.Client
log log.Logger
}
func (tp *TransformPlugin) Load(decoder *json.Decoder, pluginDir string) error {
if err := decoder.Decode(&tp); err != nil {
return err
}
if err := tp.registerPlugin(pluginDir); err != nil {
return err
}
Transform = tp
return nil
}
func (p *TransformPlugin) startBackendPlugin(ctx context.Context, log log.Logger) error {
p.log = log.New("plugin-id", p.Id)
if err := p.spawnSubProcess(); err != nil {
return err
}
go func() {
if err := p.restartKilledProcess(ctx); err != nil {
p.log.Error("Attempting to restart killed process failed", "err", err)
}
}()
return nil
}
func (p *TransformPlugin) spawnSubProcess() error {
cmd := ComposePluginStartCommmand(p.Executable)
fullpath := path.Join(p.PluginDir, cmd)
p.client = backendplugin.NewTransformClient(p.Id, fullpath, p.log)
rpcClient, err := p.client.Client()
if err != nil {
return err
}
raw, err := rpcClient.Dispense(p.Id)
if err != nil {
return err
}
plugin, ok := raw.(transform.TransformPlugin)
if !ok {
return fmt.Errorf("unexpected type %T, expected *transform.GRPCClient", raw)
}
p.TransformWrapper = NewTransformWrapper(p.log, plugin)
return nil
}
func (p *TransformPlugin) restartKilledProcess(ctx context.Context) error {
ticker := time.NewTicker(time.Second * 1)
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !xerrors.Is(err, context.Canceled) {
return err
}
return nil
case <-ticker.C:
if !p.client.Exited() {
continue
}
if err := p.spawnSubProcess(); err != nil {
p.log.Error("Failed to restart plugin", "err", err)
continue
}
p.log.Debug("Plugin process restarted")
}
}
}
func (p *TransformPlugin) Kill() {
if p.client != nil {
p.log.Debug("Killing subprocess ", "name", p.Name)
p.client.Kill()
}
}
// ...
// Wrapper Code
// ...
func NewTransformWrapper(log log.Logger, plugin transform.TransformPlugin) *TransformWrapper {
return &TransformWrapper{plugin, log, &grafanaAPI{log}}
}
type TransformWrapper struct {
transform.TransformPlugin
logger log.Logger
api *grafanaAPI
}
func (tw *TransformWrapper) Transform(ctx context.Context, query *tsdb.TsdbQuery) (*tsdb.Response, error) {
pbQuery := &pluginv2.TransformRequest{
TimeRange: &pluginv2.TimeRange{
FromRaw: query.TimeRange.From,
ToRaw: query.TimeRange.To,
ToEpochMs: query.TimeRange.GetToAsMsEpoch(),
FromEpochMs: query.TimeRange.GetFromAsMsEpoch(),
},
Queries: []*pluginv2.TransformQuery{},
}
for _, q := range query.Queries {
modelJSON, err := q.Model.MarshalJSON()
if err != nil {
return nil, err
}
pbQuery.Queries = append(pbQuery.Queries, &pluginv2.TransformQuery{
ModelJson: string(modelJSON),
IntervalMs: q.IntervalMs,
RefId: q.RefId,
MaxDataPoints: q.MaxDataPoints,
})
}
pbres, err := tw.TransformPlugin.Transform(ctx, pbQuery, tw.api)
if err != nil {
return nil, err
}
res := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{},
}
for _, r := range pbres.Results {
qr := &tsdb.QueryResult{
RefId: r.RefId,
}
if r.Error != "" {
qr.Error = errors.New(r.Error)
qr.ErrorString = r.Error
}
if r.MetaJson != "" {
metaJSON, err := simplejson.NewJson([]byte(r.MetaJson))
if err != nil {
tw.logger.Error("Error parsing JSON Meta field: " + err.Error())
}
qr.Meta = metaJSON
}
qr.Dataframes = r.Dataframes
res.Results[r.RefId] = qr
}
return res, nil
}
type grafanaAPI struct {
logger log.Logger
}
func (s *grafanaAPI) QueryDatasource(ctx context.Context, req *pluginv2.QueryDatasourceRequest) (*pluginv2.QueryDatasourceResponse, error) {
if len(req.Queries) == 0 {
return nil, fmt.Errorf("zero queries found in datasource request")
}
getDsInfo := &models.GetDataSourceByIdQuery{
Id: req.DatasourceId,
OrgId: req.OrgId,
}
if err := bus.Dispatch(getDsInfo); err != nil {
return nil, fmt.Errorf("Could not find datasource %v", err)
}
// Convert plugin-model (datasource) queries to tsdb queries
queries := make([]*tsdb.Query, len(req.Queries))
for i, query := range req.Queries {
sj, err := simplejson.NewJson([]byte(query.ModelJson))
if err != nil {
return nil, err
}
queries[i] = &tsdb.Query{
RefId: query.RefId,
IntervalMs: query.IntervalMs,
MaxDataPoints: query.MaxDataPoints,
DataSource: getDsInfo.Result,
Model: sj,
}
}
timeRange := tsdb.NewTimeRange(req.TimeRange.FromRaw, req.TimeRange.ToRaw)
tQ := &tsdb.TsdbQuery{
TimeRange: timeRange,
Queries: queries,
}
// Execute the converted queries
tsdbRes, err := tsdb.HandleRequest(ctx, getDsInfo.Result, tQ)
if err != nil {
return nil, err
}
// Convert tsdb results (map) to plugin-model/datasource (slice) results.
// Only error, tsdb.Series, and encoded Dataframes responses are mapped.
results := make([]*pluginv2.DatasourceQueryResult, 0, len(tsdbRes.Results))
for refID, res := range tsdbRes.Results {
qr := &pluginv2.DatasourceQueryResult{
RefId: refID,
}
if res.Error != nil {
qr.Error = res.ErrorString
results = append(results, qr)
continue
}
if res.Dataframes != nil {
qr.Dataframes = append(qr.Dataframes, res.Dataframes...)
results = append(results, qr)
continue
}
encodedFrames := make([][]byte, len(res.Series))
for sIdx, series := range res.Series {
frame, err := tsdb.SeriesToFrame(series)
if err != nil {
return nil, err
}
encodedFrames[sIdx], err = dataframe.MarshalArrow(frame)
if err != nil {
return nil, err
}
}
qr.Dataframes = encodedFrames
results = append(results, qr)
}
return &pluginv2.QueryDatasourceResponse{Results: results}, nil
}