API: return query results as JSON rather than base64 encoded Arrow (#32303)

This commit is contained in:
Ryan McKinley 2021-03-31 08:35:03 -07:00 committed by GitHub
parent d92145be28
commit 1446d094b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 148 additions and 143 deletions

View File

@ -73,8 +73,13 @@ e2e.scenario({
// Disable / enable row
expectInspectorResultAndClose((keys) => {
const length = keys.length;
expect(keys[length - 2].innerText).equals('A:');
expect(keys[length - 1].innerText).equals('B:');
const resultIds = new Set<string>([
keys[length - 2].innerText, // last 2
keys[length - 1].innerText, // last 2
]);
expect(resultIds.has('A:')).equals(true);
expect(resultIds.has('B:')).equals(true);
});
// Disable row with refId A
@ -94,8 +99,13 @@ e2e.scenario({
expectInspectorResultAndClose((keys) => {
const length = keys.length;
expect(keys[length - 2].innerText).equals('A:');
expect(keys[length - 1].innerText).equals('B:');
const resultIds = new Set<string>([
keys[length - 2].innerText, // last 2
keys[length - 1].innerText, // last 2
]);
expect(resultIds.has('A:')).equals(true);
expect(resultIds.has('B:')).equals(true);
});
},
});

6
go.mod
View File

@ -43,7 +43,7 @@ require (
github.com/grafana/alerting-api v0.0.0-20210330162237-0b5408c529a8
github.com/grafana/grafana-aws-sdk v0.3.0
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.89.0
github.com/grafana/grafana-plugin-sdk-go v0.90.0
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/hashicorp/go-hclog v0.15.0
@ -65,7 +65,7 @@ require (
github.com/prometheus/alertmanager v0.21.1-0.20210331075806-bc7b16d61afd
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.19.0
github.com/prometheus/common v0.20.0
github.com/robfig/cron v0.0.0-20180505203441-b41be1df6967
github.com/robfig/cron/v3 v3.0.1
github.com/russellhaering/goxmldsig v1.1.0
@ -88,7 +88,7 @@ require (
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
gonum.org/v1/gonum v0.8.2
google.golang.org/api v0.42.0
google.golang.org/grpc v1.36.0
google.golang.org/grpc v1.36.1
google.golang.org/protobuf v1.26.0
gopkg.in/ini.v1 v1.62.0
gopkg.in/ldap.v3 v3.0.2

11
go.sum
View File

@ -808,8 +808,8 @@ github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4 h1:SP
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4/go.mod h1:nc0XxBzjeGcrMltCDw269LoWF9S8ibhgxolCdA1R8To=
github.com/grafana/grafana-plugin-sdk-go v0.79.0/go.mod h1:NvxLzGkVhnoBKwzkst6CFfpMFKwAdIUZ1q8ssuLeF60=
github.com/grafana/grafana-plugin-sdk-go v0.88.0/go.mod h1:PTALh0lz+Y7k0+OMczAABTpeocL63aw6FVOBptp5GVo=
github.com/grafana/grafana-plugin-sdk-go v0.89.0 h1:TjwqMG9gS4wUbmSI8gO1NVGPUte6uw1D7Dua9I1LbZY=
github.com/grafana/grafana-plugin-sdk-go v0.89.0/go.mod h1:WACdtafPRErZbjnGqMPbmOXXQu6sWNJFzkVDmlWBIhM=
github.com/grafana/grafana-plugin-sdk-go v0.90.0 h1:ea+mTQSr/Sk00WPyRn4guFjSJMRezaOEtfA+jVwFljk=
github.com/grafana/grafana-plugin-sdk-go v0.90.0/go.mod h1:Ot3k7nY7P6DXmUsDgKvNB7oG1v7PRyTdmnYVoS554bU=
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387 h1:iwcM8lkYJ3EhytGLJ2BvRSwutb0QWoI7EWbYv3yJRsY=
github.com/grafana/loki v1.6.2-0.20201026154740-6978ee5d7387/go.mod h1:jHA1OHnPsuj3LLgMXmFopsKDt4ARHHUhrmT3JrGf71g=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
@ -1382,8 +1382,8 @@ github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16
github.com/prometheus/common v0.17.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.18.1-0.20210305175002-2a23014b3b39/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.19.0 h1:Itb4+NjG9wRdkAWgVucbM/adyIXxEhbw0866e0uZE6A=
github.com/prometheus/common v0.19.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.20.0 h1:pfeDeUdQcIxOMutNjCejsEFp7qeP+/iltHSSmLpE+hU=
github.com/prometheus/common v0.20.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/exporter-toolkit v0.5.0/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg=
github.com/prometheus/exporter-toolkit v0.5.1/go.mod h1:OCkM4805mmisBhLmVFw858QYi3v0wKdY6/UxrT0pZVg=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM=
@ -2229,8 +2229,9 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.0 h1:o1bcQ6imQMIOpdrO3SWf2z5RV72WbDwdXuK0MDlc8As=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20200728065043-dfc0c05b2da9/go.mod h1:5j1uub0jRGhRiSghIlrThmBUgcgLXOVJQ/l1getT4uo=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

View File

@ -194,3 +194,29 @@ export function dataFrameFromJSON(dto: DataFrameJSON): DataFrame {
length,
};
}
/**
* This converts DataFrame to a json representation with distinct schema+data
*
* @alpha
*/
export function dataFrameToJSON(frame: DataFrame): DataFrameJSON {
const data: DataFrameData = {
values: [],
};
const schema: DataFrameSchema = {
refId: frame.refId,
meta: frame.meta,
name: frame.name,
fields: frame.fields.map((f) => {
const { values, ...sfield } = f;
data.values.push(values.toArray());
return sfield;
}),
};
return {
schema,
data,
};
}

View File

@ -1,7 +1,6 @@
import { DataQuery, toDataFrameDTO, DataFrame } from '@grafana/data';
import { toDataQueryResponse } from './queryResponse';
/* eslint-disable */
const resp = {
data: {
results: {
@ -38,8 +37,6 @@ const emptyResults = {
data: { results: { '': { refId: '' } } },
};
/* eslint-enable */
describe('Query Response parser', () => {
test('should parse output with dataframe', () => {
const res = toDataQueryResponse(resp);
@ -235,7 +232,7 @@ describe('Query Response parser', () => {
const queries: DataQuery[] = [{ refId: 'A' }, { refId: 'B' }];
const ids = (toDataQueryResponse(resp, queries).data as DataFrame[]).map((f) => f.refId);
expect(ids).toEqual(['A', 'B', 'X']);
expect(ids).toEqual(['A', 'B']);
});
test('resultWithError', () => {

View File

@ -12,6 +12,8 @@ import {
MetricFindValue,
FieldType,
DataQuery,
DataFrameJSON,
dataFrameFromJSON,
} from '@grafana/data';
import { FetchResponse } from '../services';
@ -25,8 +27,10 @@ import { FetchResponse } from '../services';
export interface DataResponse {
error?: string;
refId?: string;
// base64 encoded arrow tables
dataframes?: string[];
frames?: DataFrameJSON[];
// Legacy TSDB format...
dataframes?: string[]; // base64 encoded arrow tables
series?: TimeSeries[];
tables?: TableData[];
}
@ -59,9 +63,7 @@ export function toDataQueryResponse(
// If the response isn't in a correct shape we just ignore the data and pass empty DataQueryResponse.
if ((res as FetchResponse).data?.results) {
const results = (res as FetchResponse).data.results;
const resultIDs = Object.keys(results);
const refIDs = queries ? queries.map((q) => q.refId) : resultIDs;
const usedResultIDs = new Set<string>(resultIDs);
const refIDs = queries?.length ? queries.map((q) => q.refId) : Object.keys(results);
const data: DataResponse[] = [];
for (const refId of refIDs) {
@ -70,23 +72,9 @@ export function toDataQueryResponse(
continue;
}
dr.refId = refId;
usedResultIDs.delete(refId);
data.push(dr);
}
// Add any refIds that do not match the query targets
if (usedResultIDs.size) {
for (const refId of usedResultIDs) {
const dr = results[refId];
if (!dr) {
continue;
}
dr.refId = refId;
usedResultIDs.delete(refId);
data.push(dr);
}
}
for (const dr of data) {
if (dr.error) {
if (!rsp.error) {
@ -98,6 +86,17 @@ export function toDataQueryResponse(
}
}
if (dr.frames?.length) {
for (const js of dr.frames) {
const df = dataFrameFromJSON(js);
if (!df.refId) {
df.refId = dr.refId;
}
rsp.data.push(df);
}
continue; // the other tests are legacy
}
if (dr.series?.length) {
for (const s of dr.series) {
if (!s.refId) {

View File

@ -5,6 +5,7 @@ import (
"errors"
"net/http"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
@ -78,16 +79,25 @@ func (hs *HTTPServer) QueryMetricsV2(c *models.ReqContext, reqDTO dtos.MetricReq
return response.Error(http.StatusInternalServerError, "Metric request error", err)
}
// This is insanity... but ¯\_(ツ)_/¯, the current query path looks like:
// encodeJson( decodeBase64( encodeBase64( decodeArrow( encodeArrow(frame)) ) )
// this will soon change to a more direct route
qdr, err := resp.ToBackendDataResponse()
if err != nil {
return response.Error(http.StatusInternalServerError, "error converting results", err)
}
return toMacronResponse(qdr)
}
func toMacronResponse(qdr *backend.QueryDataResponse) response.Response {
statusCode := http.StatusOK
for _, res := range resp.Results {
for _, res := range qdr.Responses {
if res.Error != nil {
res.ErrorString = res.Error.Error()
resp.Message = res.ErrorString
statusCode = http.StatusBadRequest
}
}
return response.JSONStreaming(statusCode, resp)
return response.JSONStreaming(statusCode, qdr)
}
// handleExpressions handles POST /api/ds/query when there is an expression.
@ -131,21 +141,11 @@ func (hs *HTTPServer) handleExpressions(c *models.ReqContext, reqDTO dtos.Metric
Cfg: hs.Cfg,
DataService: hs.DataService,
}
resp, err := exprService.WrapTransformData(c.Req.Context(), request)
qdr, err := exprService.WrapTransformData(c.Req.Context(), request)
if err != nil {
return response.Error(500, "expression request error", err)
}
statusCode := 200
for _, res := range resp.Results {
if res.Error != nil {
res.ErrorString = res.Error.Error()
resp.Message = res.ErrorString
statusCode = 400
}
}
return response.JSONStreaming(statusCode, resp)
return toMacronResponse(qdr)
}
func (hs *HTTPServer) handleGetDataSourceError(err error, datasourceID int64) *response.NormalResponse {

View File

@ -35,7 +35,7 @@ func init() {
}
// WrapTransformData creates and executes transform requests
func (s *Service) WrapTransformData(ctx context.Context, query plugins.DataQuery) (plugins.DataResponse, error) {
func (s *Service) WrapTransformData(ctx context.Context, query plugins.DataQuery) (*backend.QueryDataResponse, error) {
sdkReq := &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
OrgID: query.User.OrgId,
@ -46,7 +46,7 @@ func (s *Service) WrapTransformData(ctx context.Context, query plugins.DataQuery
for _, q := range query.Queries {
modelJSON, err := q.Model.MarshalJSON()
if err != nil {
return plugins.DataResponse{}, err
return nil, err
}
sdkReq.Queries = append(sdkReq.Queries, backend.DataQuery{
JSON: modelJSON,
@ -60,30 +60,7 @@ func (s *Service) WrapTransformData(ctx context.Context, query plugins.DataQuery
},
})
}
pbRes, err := s.TransformData(ctx, sdkReq)
if err != nil {
return plugins.DataResponse{}, err
}
tR := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult, len(pbRes.Responses)),
}
for refID, res := range pbRes.Responses {
tRes := plugins.DataQueryResult{
RefID: refID,
Dataframes: plugins.NewDecodedDataFrames(res.Frames),
}
// if len(res.JsonMeta) != 0 {
// tRes.Meta = simplejson.NewFromAny(res.JsonMeta)
// }
if res.Error != nil {
tRes.Error = res.Error
tRes.ErrorString = res.Error.Error()
}
tR.Results[refID] = tRes
}
return tR, nil
return s.TransformData(ctx, sdkReq)
}
// TransformData takes Queries which are either expressions nodes
@ -214,37 +191,6 @@ func (s *Service) queryData(ctx context.Context, req *backend.QueryDataRequest)
if err != nil {
return nil, err
}
// Convert tsdb results (map) to plugin-model/datasource (slice) results.
// Only error, Series, and encoded Dataframes responses are mapped.
responses := make(map[string]backend.DataResponse, len(tsdbRes.Results))
for refID, res := range tsdbRes.Results {
pRes := backend.DataResponse{}
if res.Error != nil {
pRes.Error = res.Error
}
if res.Dataframes != nil {
decoded, err := res.Dataframes.Decoded()
if err != nil {
return nil, err
}
pRes.Frames = decoded
responses[refID] = pRes
continue
}
for _, series := range res.Series {
frame, err := plugins.SeriesToFrame(series)
frame.RefID = refID
if err != nil {
return nil, err
}
pRes.Frames = append(pRes.Frames, frame)
}
responses[refID] = pRes
}
return &backend.QueryDataResponse{
Responses: responses,
}, nil
return tsdbRes.ToBackendDataResponse()
}

View File

@ -8,6 +8,7 @@ import (
"strconv"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
@ -189,6 +190,44 @@ type DataResponse struct {
Message string `json:"message,omitempty"`
}
// ToBackendDataResponse converts the legacy format to the standard SDK format
func (r DataResponse) ToBackendDataResponse() (*backend.QueryDataResponse, error) {
qdr := &backend.QueryDataResponse{
Responses: make(map[string]backend.DataResponse, len(r.Results)),
}
// Convert tsdb results (map) to plugin-model/datasource (slice) results.
// Only error, Series, and encoded Dataframes responses are mapped.
for refID, res := range r.Results {
pRes := backend.DataResponse{}
if res.Error != nil {
pRes.Error = res.Error
}
if res.Dataframes != nil {
decoded, err := res.Dataframes.Decoded()
if err != nil {
return qdr, err
}
pRes.Frames = decoded
qdr.Responses[refID] = pRes
continue
}
for _, series := range res.Series {
frame, err := SeriesToFrame(series)
if err != nil {
return nil, err
}
frame.RefID = refID
pRes.Frames = append(pRes.Frames, frame)
}
qdr.Responses[refID] = pRes
}
return qdr, nil
}
type DataPlugin interface {
DataQuery(ctx context.Context, ds *models.DataSource, query DataQuery) (DataResponse, error)
}

View File

@ -14,9 +14,9 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/tests/testinfra"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
@ -69,7 +69,7 @@ func TestQueryCloudWatchMetrics(t *testing.T) {
}
result := makeCWRequest(t, req, addr)
dataFrames := plugins.NewDecodedDataFrames(data.Frames{
dataFrames := data.Frames{
&data.Frame{
RefID: "A",
Fields: []*data.Field{
@ -82,21 +82,13 @@ func TestQueryCloudWatchMetrics(t *testing.T) {
},
},
},
})
}
// Have to call this so that dataFrames.encoded is non-nil, for the comparison
// In the future we should use gocmp instead and ignore this field
_, err := dataFrames.Encoded()
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"A": {
RefID: "A",
Dataframes: dataFrames,
},
},
}, result)
expect := backend.NewQueryDataResponse()
expect.Responses["A"] = backend.DataResponse{
Frames: dataFrames,
}
assert.Equal(t, *expect, result)
})
}
@ -130,7 +122,7 @@ func TestQueryCloudWatchLogs(t *testing.T) {
}
tr := makeCWRequest(t, req, addr)
dataFrames := plugins.NewDecodedDataFrames(data.Frames{
dataFrames := data.Frames{
&data.Frame{
Name: "logGroups",
RefID: "A",
@ -141,23 +133,17 @@ func TestQueryCloudWatchLogs(t *testing.T) {
PreferredVisualization: "logs",
},
},
})
// Have to call this so that dataFrames.encoded is non-nil, for the comparison
// In the future we should use gocmp instead and ignore this field
_, err := dataFrames.Encoded()
require.NoError(t, err)
assert.Equal(t, plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"A": {
RefID: "A",
Dataframes: dataFrames,
},
},
}, tr)
}
expect := backend.NewQueryDataResponse()
expect.Responses["A"] = backend.DataResponse{
Frames: dataFrames,
}
assert.Equal(t, *expect, tr)
})
}
func makeCWRequest(t *testing.T, req dtos.MetricRequest, addr string) plugins.DataResponse {
func makeCWRequest(t *testing.T, req dtos.MetricRequest, addr string) backend.QueryDataResponse {
t.Helper()
buf := bytes.Buffer{}
@ -180,7 +166,7 @@ func makeCWRequest(t *testing.T, req dtos.MetricRequest, addr string) plugins.Da
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
var tr plugins.DataResponse
var tr backend.QueryDataResponse
err = json.Unmarshal(buf.Bytes(), &tr)
require.NoError(t, err)

View File

@ -8,6 +8,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/grafana/grafana/pkg/infra/log"
@ -87,7 +88,7 @@ func (p *testStreamHandler) runTestStream(ctx context.Context, path string, conf
frame.Fields[2].Set(0, walker-((rand.Float64()*spread)+0.01)) // Min
frame.Fields[3].Set(0, walker+((rand.Float64()*spread)+0.01)) // Max
bytes, err := data.FrameToJSON(frame, true, true)
bytes, err := jsoniter.Marshal(frame) // schema + points
if err != nil {
logger.Warn("unable to marshal line", "error", err)
continue

View File

@ -24,7 +24,7 @@ describe('Tempo data source', () => {
})
);
const ds = new TempoDatasource(defaultSettings);
await expect(ds.query({ targets: [{ query: '12345' }] } as any)).toEmitValuesWith((response) => {
await expect(ds.query({ targets: [{ refId: 'refid1' }] } as any)).toEmitValuesWith((response) => {
const fields = (response[0].data[0] as DataFrame).fields;
expect(
fields.map((f) => ({