Arrow: move arrow support from frontend to backend only (#32575)

This commit is contained in:
Ryan McKinley
2021-04-01 10:30:08 -07:00
committed by GitHub
parent 8793f5c7f8
commit c7ea96940a
18 changed files with 158 additions and 979 deletions

View File

@@ -13,7 +13,6 @@ import (
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/datasourceproxy"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
@@ -131,14 +130,9 @@ func (api *API) conditionEvalEndpoint(c *models.ReqContext, cmd ngmodels.EvalAle
}
frame := evalResults.AsDataFrame()
df := plugins.NewDecodedDataFrames([]*data.Frame{&frame})
instances, err := df.Encoded()
if err != nil {
return response.Error(400, "Failed to encode result dataframes", err)
}
return response.JSON(200, util.DynMap{
"instances": instances,
return response.JSONStreaming(200, util.DynMap{
"instances": []*data.Frame{&frame},
})
}
@@ -162,17 +156,8 @@ func (api *API) alertDefinitionEvalEndpoint(c *models.ReqContext) response.Respo
}
frame := evalResults.AsDataFrame()
df := plugins.NewDecodedDataFrames([]*data.Frame{&frame})
if err != nil {
return response.Error(400, "Failed to instantiate Dataframes from the decoded frames", err)
}
instances, err := df.Encoded()
if err != nil {
return response.Error(400, "Failed to encode result dataframes", err)
}
return response.JSON(200, util.DynMap{
"instances": instances,
return response.JSONStreaming(200, util.DynMap{
"instances": []*data.Frame{&frame},
})
}

View File

@@ -2,6 +2,7 @@ package testdatasource
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math"
@@ -150,7 +151,7 @@ Timestamps will line up evenly on timeStepSeconds (For example, 60 seconds means
p.registerScenario(&Scenario{
ID: string(arrowQuery),
Name: "Load Apache Arrow Data",
handler: p.handleClientSideScenario,
handler: p.handleArrowScenario,
})
p.registerScenario(&Scenario{
@@ -484,6 +485,30 @@ func (p *testDataPlugin) handleClientSideScenario(ctx context.Context, req *back
return backend.NewQueryDataResponse(), nil
}
func (p *testDataPlugin) handleArrowScenario(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
for _, q := range req.Queries {
model, err := simplejson.NewJson(q.JSON)
if err != nil {
return nil, err
}
respD := resp.Responses[q.RefID]
frame, err := doArrowQuery(q, model)
if err != nil {
return nil, err
}
if frame == nil {
continue
}
respD.Frames = append(respD.Frames, frame)
resp.Responses[q.RefID] = respD
}
return resp, nil
}
func (p *testDataPlugin) handleExponentialHeatmapBucketDataScenario(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
@@ -875,6 +900,18 @@ func randomHeatmapData(query backend.DataQuery, fnBucketGen func(index int) floa
return frame
}
func doArrowQuery(query backend.DataQuery, model *simplejson.Json) (*data.Frame, error) {
encoded := model.Get("stringInput").MustString("")
if encoded == "" {
return nil, nil
}
arrow, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return nil, err
}
return data.UnmarshalArrowFrame(arrow)
}
func newSeriesForQuery(query backend.DataQuery, model *simplejson.Json, index int) *data.Frame {
alias := model.Get("alias").MustString("")
suffix := ""