InfluxDB: Implement InfluxQL json streaming parser (#76934)

* Have the first iteration

* Prepare bench testing

* rename the test files

* Remove unnecessary test file

* Introduce influxqlStreamingParser feature flag

* Apply streaming parser feature flag

* Add new tests

* More tests

* return executedQueryString only in first frame

* add frame meta and config

* Update golden json files

* Support tags/labels

* more tests

* more tests

* Don't change original response_parser.go

* provide context

* create util package

* don't pass the row

* update converter with formatted frameName

* add executedQueryString info only to first frame

* update golden files

* rename

* update test file

* use pointer values

* update testdata

* update parsing

* update converter for null values

* prepare converter for table response

* clean up

* return timeField in fields

* handle no time column responses

* better nil field handling

* refactor the code

* add table tests

* fix config for table

* table response format

* fix value

* if there is no time column set name

* linting

* refactoring

* handle the status code

* add tracing

* Update pkg/tsdb/influxdb/influxql/converter/converter_test.go

Co-authored-by: İnanç Gümüş <m@inanc.io>

* fix import

* update test data

* sanity

* sanity

* linting

* simplicity

* return empty rsp

* rename to prevent confusion

* nullableJson field type for null values

* better handling null values

* remove duplicate test file

* fix healthcheck

* use util for pointer

* move bench test to root

* provide fake feature manager

* add more tests

* partial fix for null values in table response format

* handle partial null fields

* comments for easy testing

* move frameName allocation in readSeries

* one less append operation

* performance improvement by making string conversion once

pkg: github.com/grafana/grafana/pkg/tsdb/influxdb/influxql
             │ stream2.txt │            stream3.txt             │
             │   sec/op    │   sec/op     vs base               │
ParseJson-10   314.4m ± 1%   303.9m ± 1%  -3.34% (p=0.000 n=10)

             │ stream2.txt  │             stream3.txt              │
             │     B/op     │     B/op      vs base                │
ParseJson-10   425.2Mi ± 0%   382.7Mi ± 0%  -10.00% (p=0.000 n=10)

             │ stream2.txt │            stream3.txt             │
             │  allocs/op  │  allocs/op   vs base               │
ParseJson-10   7.224M ± 0%   6.689M ± 0%  -7.41% (p=0.000 n=10)

* add comment lines

---------

Co-authored-by: İnanç Gümüş <m@inanc.io>
This commit is contained in:
ismail simsek 2023-12-06 12:39:05 +01:00 committed by GitHub
parent e8b2e85966
commit c088d003f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1121 additions and 55 deletions

View File

@ -86,7 +86,7 @@ func TestIntegrationPluginManager(t *testing.T) {
cm := cloudmonitoring.ProvideService(hcp, tracer)
es := elasticsearch.ProvideService(hcp, tracer)
grap := graphite.ProvideService(hcp, tracer)
idb := influxdb.ProvideService(hcp)
idb := influxdb.ProvideService(hcp, features)
lk := loki.ProvideService(hcp, features, tracer)
otsdb := opentsdb.ProvideService(hcp)
pr := prometheus.ProvideService(hcp, cfg, features)

View File

@ -7,8 +7,10 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"github.com/grafana/grafana/pkg/tsdb/influxdb/fsql"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql"
@ -35,7 +37,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque
case influxVersionFlux:
return CheckFluxHealth(ctx, dsInfo, req)
case influxVersionInfluxQL:
return CheckInfluxQLHealth(ctx, dsInfo)
return CheckInfluxQLHealth(ctx, dsInfo, s.features)
case influxVersionSQL:
return CheckSQLHealth(ctx, dsInfo, req)
default:
@ -78,10 +80,10 @@ func CheckFluxHealth(ctx context.Context, dsInfo *models.DatasourceInfo,
return getHealthCheckMessage(logger, "", errors.New("error getting flux query buckets"))
}
func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo) (*backend.CheckHealthResult, error) {
func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo, features featuremgmt.FeatureToggles) (*backend.CheckHealthResult, error) {
logger := logger.FromContext(ctx)
resp, err := influxql.Query(ctx, dsInfo, &backend.QueryDataRequest{
tracer := tracing.DefaultTracer()
resp, err := influxql.Query(ctx, tracer, dsInfo, &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
RefID: refID,
@ -89,7 +91,7 @@ func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo) (*b
JSON: []byte(`{"query": "SHOW measurements", "rawQuery": true}`),
},
},
})
}, features)
if err != nil {
return getHealthCheckMessage(logger, "error performing influxQL query", err)
}

View File

@ -8,7 +8,9 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"github.com/grafana/grafana/pkg/tsdb/influxdb/fsql"
@ -21,12 +23,14 @@ import (
var logger log.Logger = log.New("tsdb.influxdb")
type Service struct {
im instancemgmt.InstanceManager
im instancemgmt.InstanceManager
features featuremgmt.FeatureToggles
}
func ProvideService(httpClient httpclient.Provider) *Service {
func ProvideService(httpClient httpclient.Provider, features featuremgmt.FeatureToggles) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClient)),
im: datasource.NewInstanceManager(newInstanceSettings(httpClient)),
features: features,
}
}
@ -90,6 +94,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
logger := logger.FromContext(ctx)
logger.Debug("Received a query request", "numQueries", len(req.Queries))
tracer := tracing.DefaultTracer()
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
if err != nil {
return nil, err
@ -101,7 +107,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
case influxVersionFlux:
return flux.Query(ctx, dsInfo, *req)
case influxVersionInfluxQL:
return influxql.Query(ctx, dsInfo, req)
return influxql.Query(ctx, tracer, dsInfo, req, s.features)
case influxVersionSQL:
return fsql.Query(ctx, dsInfo, *req)
default:

View File

@ -1,4 +1,4 @@
package influxql
package buffered
import (
"encoding/json"

View File

@ -1,4 +1,4 @@
package influxql
package buffered
import (
"encoding/json"
@ -20,10 +20,13 @@ import (
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
const shouldUpdate = false
const (
shouldUpdate = false
testPath = "../testdata"
)
func readJsonFile(filePath string) io.ReadCloser {
bytes, err := os.ReadFile(filepath.Join("testdata", filepath.Clean(filePath)+".json"))
bytes, err := os.ReadFile(filepath.Join(testPath, filepath.Clean(filePath)+".json"))
if err != nil {
panic(fmt.Sprintf("cannot read the file: %s", filePath))
}
@ -42,10 +45,10 @@ func generateQuery(resFormat string, alias string) *models.Query {
var testFiles = []string{
"all_values_are_null",
"influx_select_all_from_cpu",
"one_measurement_with_two_columns",
"response_with_weird_tag",
"some_values_are_null",
"error_on_top_level_response",
"simple_response",
"multiple_series_with_tags_and_multiple_columns",
"multiple_series_with_tags",
@ -76,21 +79,16 @@ func TestReadInfluxAsTable(t *testing.T) {
func runScenario(tf string, resultFormat string) func(t *testing.T) {
return func(t *testing.T) {
f, err := os.Open(path.Join("testdata", filepath.Clean(tf+".json")))
f, err := os.Open(path.Join(testPath, filepath.Clean(tf+".json")))
require.NoError(t, err)
query := generateQuery(resultFormat, "")
rsp := ResponseParse(io.NopCloser(f), 200, query)
if strings.Contains(tf, "error") {
require.Error(t, rsp.Error)
return
}
require.NoError(t, rsp.Error)
fname := tf + "." + resultFormat + ".golden"
experimental.CheckGoldenJSONResponse(t, "testdata", fname, rsp, shouldUpdate)
experimental.CheckGoldenJSONResponse(t, testPath, fname, rsp, shouldUpdate)
}
}

View File

@ -0,0 +1,452 @@
package converter
import (
"fmt"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
"github.com/grafana/grafana/pkg/util/converter/jsonitere"
)
func rspErr(e error) *backend.DataResponse {
return &backend.DataResponse{Error: e}
}
func ReadInfluxQLStyleResult(jIter *jsoniter.Iterator, query *models.Query) *backend.DataResponse {
iter := jsonitere.NewIterator(jIter)
var rsp *backend.DataResponse
l1Fields:
for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "results":
rsp = readResults(iter, query)
if rsp.Error != nil {
return rsp
}
case "":
if err != nil {
return rspErr(err)
}
break l1Fields
default:
v, err := iter.Read()
if err != nil {
rsp.Error = err
return rsp
}
fmt.Printf("[ROOT] unsupported key: %s / %v\n\n", l1Field, v)
}
}
return rsp
}
func readResults(iter *jsonitere.Iterator, query *models.Query) *backend.DataResponse {
rsp := &backend.DataResponse{Frames: make(data.Frames, 0)}
l1Fields:
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return rspErr(err)
}
for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "series":
rsp = readSeries(iter, query)
case "":
break l1Fields
default:
_, err := iter.Read()
if err != nil {
return rspErr(err)
}
}
}
}
return rsp
}
func readSeries(iter *jsonitere.Iterator, query *models.Query) *backend.DataResponse {
var (
measurement string
tags map[string]string
columns []string
valueFields data.Fields
hasTimeColumn bool
)
// frameName is pre-allocated. So we can reuse it, saving memory.
// It's sized for a reasonably-large name, but will grow if needed.
frameName := make([]byte, 0, 128)
rsp := &backend.DataResponse{Frames: make(data.Frames, 0)}
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return rspErr(err)
}
for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "name":
if measurement, err = iter.ReadString(); err != nil {
return rspErr(err)
}
case "tags":
if tags, err = readTags(iter); err != nil {
return rspErr(err)
}
case "columns":
columns, err = readColumns(iter)
if err != nil {
return rspErr(err)
}
if columns[0] == "time" {
hasTimeColumn = true
}
case "values":
valueFields, err = readValues(iter, hasTimeColumn)
if err != nil {
return rspErr(err)
}
if util.GetVisType(query.ResultFormat) != util.TableVisType {
for i, v := range valueFields {
if v.Type() == data.FieldTypeNullableJSON {
maybeFixValueFieldType(valueFields, data.FieldTypeNullableFloat64, i)
}
}
}
default:
v, err := iter.Read()
if err != nil {
return rspErr(err)
}
fmt.Printf("[Series] unsupported key: %s / %v\n", l1Field, v)
}
}
if util.GetVisType(query.ResultFormat) == util.TableVisType {
handleTableFormatFirstFrame(rsp, measurement, query)
handleTableFormatFirstField(rsp, valueFields, columns)
handleTableFormatTagFields(rsp, valueFields, tags)
handleTableFormatValueFields(rsp, valueFields, tags, columns)
} else {
// time_series response format
if hasTimeColumn {
// Frame with time column
newFrames := handleTimeSeriesFormatWithTimeColumn(valueFields, tags, columns, measurement, frameName, query)
rsp.Frames = append(rsp.Frames, newFrames...)
} else {
// Frame without time column
newFrame := handleTimeSeriesFormatWithoutTimeColumn(valueFields, columns, measurement, query)
rsp.Frames = append(rsp.Frames, newFrame)
}
}
}
// if all values are null in a field, we convert the field type to NullableFloat64
// it is because of the consistency between buffer and stream parser
// also frontend probably will not interpret the nullableJson value
for i, f := range rsp.Frames {
for j, v := range f.Fields {
if v.Type() == data.FieldTypeNullableJSON {
newField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)
newField.Name = v.Name
newField.Config = v.Config
for k := 0; k < v.Len(); k++ {
newField.Append(nil)
}
rsp.Frames[i].Fields[j] = newField
}
}
}
return rsp
}
func readTags(iter *jsonitere.Iterator) (map[string]string, error) {
tags := make(map[string]string)
for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() {
if err != nil {
return nil, err
}
value, err := iter.ReadString()
if err != nil {
return nil, err
}
tags[l1Field] = value
}
return tags, nil
}
func readColumns(iter *jsonitere.Iterator) (columns []string, err error) {
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return nil, err
}
l1Field, err := iter.ReadString()
if err != nil {
return nil, err
}
columns = append(columns, l1Field)
}
return columns, nil
}
func readValues(iter *jsonitere.Iterator, hasTimeColumn bool) (valueFields data.Fields, err error) {
if hasTimeColumn {
valueFields = append(valueFields, data.NewField("Time", nil, make([]time.Time, 0)))
}
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return nil, err
}
colIdx := 0
for more2, err := iter.ReadArray(); more2; more2, err = iter.ReadArray() {
if err != nil {
return nil, err
}
if hasTimeColumn && colIdx == 0 {
// Read time
var t float64
if t, err = iter.ReadFloat64(); err != nil {
return nil, err
}
valueFields[0].Append(time.UnixMilli(int64(t)).UTC())
colIdx++
continue
}
// Read column values
next, err := iter.WhatIsNext()
if err != nil {
return nil, err
}
switch next {
case jsoniter.StringValue:
s, err := iter.ReadString()
if err != nil {
return nil, err
}
valueFields = maybeCreateValueField(valueFields, data.FieldTypeNullableString, colIdx)
maybeFixValueFieldType(valueFields, data.FieldTypeNullableString, colIdx)
tryToAppendValue(valueFields, &s, colIdx)
case jsoniter.NumberValue:
n, err := iter.ReadFloat64()
if err != nil {
return nil, err
}
valueFields = maybeCreateValueField(valueFields, data.FieldTypeNullableFloat64, colIdx)
maybeFixValueFieldType(valueFields, data.FieldTypeNullableFloat64, colIdx)
tryToAppendValue(valueFields, &n, colIdx)
case jsoniter.BoolValue:
b, err := iter.ReadAny()
if err != nil {
return nil, err
}
valueFields = maybeCreateValueField(valueFields, data.FieldTypeNullableBool, colIdx)
maybeFixValueFieldType(valueFields, data.FieldTypeNullableBool, colIdx)
tryToAppendValue(valueFields, util.ToPtr(b.ToBool()), colIdx)
case jsoniter.NilValue:
_, _ = iter.Read()
if len(valueFields) <= colIdx {
// no value field created before
// we don't know the type of the values for this field, yet
// so we create a FieldTypeNullableJSON to hold nil values
// if that is something else it will be replaced later
unknownField := data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)
unknownField.Name = "Value"
valueFields = append(valueFields, unknownField)
}
valueFields[colIdx].Append(nil)
default:
return nil, fmt.Errorf("unknown value type")
}
colIdx++
}
}
return valueFields, nil
}
// maybeCreateValueField checks whether a value field has created already.
// if it hasn't, creates a new one
func maybeCreateValueField(valueFields data.Fields, expectedType data.FieldType, colIdx int) data.Fields {
if len(valueFields) == colIdx {
newField := data.NewFieldFromFieldType(expectedType, 0)
newField.Name = "Value"
valueFields = append(valueFields, newField)
}
return valueFields
}
// maybeFixValueFieldType checks if the value field type is matching
// For nil values we might have added FieldTypeNullableJSON value field
// if the type of the field in valueFields is not matching the expected type
// or the type of the field in valueFields is nullableJSON
// we change the type of the field as expectedType
func maybeFixValueFieldType(valueFields data.Fields, expectedType data.FieldType, colIdx int) {
if valueFields[colIdx].Type() == expectedType || valueFields[colIdx].Type() != data.FieldTypeNullableJSON {
return
}
stringField := data.NewFieldFromFieldType(expectedType, 0)
stringField.Name = "Value"
for i := 0; i < valueFields[colIdx].Len(); i++ {
stringField.Append(nil)
}
valueFields[colIdx] = stringField
}
func tryToAppendValue[T *string | *float64 | *bool](valueFields data.Fields, value T, colIdx int) {
if valueFields[colIdx].Type() == typeOf(value) {
valueFields[colIdx].Append(value)
} else {
valueFields[colIdx].Append(nil)
}
}
func typeOf(value interface{}) data.FieldType {
switch v := value.(type) {
case *string:
return data.FieldTypeNullableString
case *float64:
return data.FieldTypeNullableFloat64
case *bool:
return data.FieldTypeNullableBool
default:
fmt.Printf("unknown value type: %v", v)
return data.FieldTypeNullableJSON
}
}
func handleTimeSeriesFormatWithTimeColumn(valueFields data.Fields, tags map[string]string, columns []string, measurement string, frameName []byte, query *models.Query) []*data.Frame {
frames := make([]*data.Frame, 0, len(columns)-1)
for i, v := range columns {
if v == "time" {
continue
}
formattedFrameName := string(util.FormatFrameName(measurement, v, tags, *query, frameName[:]))
valueFields[i].Labels = tags
valueFields[i].Config = &data.FieldConfig{DisplayNameFromDS: formattedFrameName}
frame := data.NewFrame(formattedFrameName, valueFields[0], valueFields[i])
frames = append(frames, frame)
}
return frames
}
func handleTimeSeriesFormatWithoutTimeColumn(valueFields data.Fields, columns []string, measurement string, query *models.Query) *data.Frame {
// Frame without time column
if len(columns) >= 2 && strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) {
return data.NewFrame(measurement, valueFields[1])
}
if len(columns) >= 1 {
return data.NewFrame(measurement, valueFields[0])
}
return nil
}
func handleTableFormatFirstFrame(rsp *backend.DataResponse, measurement string, query *models.Query) {
// Add the first and only frame for table format
if len(rsp.Frames) == 0 {
newFrame := data.NewFrame(measurement)
newFrame.Meta = &data.FrameMeta{
ExecutedQueryString: query.RawQuery,
PreferredVisualization: util.GetVisType(query.ResultFormat),
}
rsp.Frames = append(rsp.Frames, newFrame)
}
}
func handleTableFormatFirstField(rsp *backend.DataResponse, valueFields data.Fields, columns []string) {
if len(rsp.Frames[0].Fields) == 0 {
rsp.Frames[0].Fields = append(rsp.Frames[0].Fields, valueFields[0])
if columns[0] != "time" {
rsp.Frames[0].Fields[0].Name = columns[0]
rsp.Frames[0].Fields[0].Config = &data.FieldConfig{DisplayNameFromDS: columns[0]}
}
} else {
var i int
for i < valueFields[0].Len() {
rsp.Frames[0].Fields[0].Append(valueFields[0].At(i))
i++
}
}
}
func handleTableFormatTagFields(rsp *backend.DataResponse, valueFields data.Fields, tags map[string]string) {
ti := 1
// We have the first field, so we should add tagField if there is any tag
for k, v := range tags {
if len(rsp.Frames[0].Fields) == ti {
tagField := data.NewField(k, nil, []*string{})
tagField.Config = &data.FieldConfig{DisplayNameFromDS: k}
rsp.Frames[0].Fields = append(rsp.Frames[0].Fields, tagField)
}
var i int
for i < valueFields[0].Len() {
val := v[0:]
rsp.Frames[0].Fields[ti].Append(&val)
i++
}
ti++
}
}
func handleTableFormatValueFields(rsp *backend.DataResponse, valueFields data.Fields, tags map[string]string, columns []string) {
// number of fields we currently have in the first frame
// we handled first value field and then tags.
si := len(tags) + 1
for i, v := range valueFields {
// first value field is always handled first, before tags.
// no need to create another one again here
if i == 0 {
continue
}
if len(rsp.Frames[0].Fields) == si {
rsp.Frames[0].Fields = append(rsp.Frames[0].Fields, v)
} else {
for vi := 0; vi < v.Len(); vi++ {
if v.Type() == data.FieldTypeNullableJSON {
// add nil explicitly.
// we don't know if it is a float pointer nil or string pointer nil or etc
rsp.Frames[0].Fields[si].Append(nil)
} else {
if v.Type() != rsp.Frames[0].Fields[si].Type() {
maybeFixValueFieldType(rsp.Frames[0].Fields, v.Type(), si)
}
rsp.Frames[0].Fields[si].Append(v.At(vi))
}
}
}
rsp.Frames[0].Fields[si].Name = columns[i]
rsp.Frames[0].Fields[si].Config = &data.FieldConfig{DisplayNameFromDS: columns[i]}
si++
}
}

View File

@ -0,0 +1,82 @@
package converter
import (
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/assert"
)
func TestMaybeFixValueFieldType(t *testing.T) {
tests := []struct {
name string
valueFields data.Fields
inputType data.FieldType
colIdx int
expectedType data.FieldType
}{
{
name: "should do nothing if both are the same type (bool)",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableBool, 0)},
inputType: data.FieldTypeNullableBool,
colIdx: 0,
expectedType: data.FieldTypeNullableBool,
},
{
name: "should do nothing if both are the same type (string)",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableString, 0)},
inputType: data.FieldTypeNullableString,
colIdx: 0,
expectedType: data.FieldTypeNullableString,
},
{
name: "should do nothing if both are the same type (float64)",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)},
inputType: data.FieldTypeNullableFloat64,
colIdx: 0,
expectedType: data.FieldTypeNullableFloat64,
},
{
name: "should return nullableJson if both are nullableJson",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)},
inputType: data.FieldTypeNullableJSON,
colIdx: 0,
expectedType: data.FieldTypeNullableJSON,
},
{
name: "should return nullableString if valueField is nullableJson and input is nullableString",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)},
inputType: data.FieldTypeNullableString,
colIdx: 0,
expectedType: data.FieldTypeNullableString,
},
{
name: "should return nullableBool if valueField is nullableJson and input is nullableBool",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)},
inputType: data.FieldTypeNullableBool,
colIdx: 0,
expectedType: data.FieldTypeNullableBool,
},
{
name: "should return nullableFloat64 if valueField is nullableJson and input is nullableFloat64",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)},
inputType: data.FieldTypeNullableFloat64,
colIdx: 0,
expectedType: data.FieldTypeNullableFloat64,
},
{
name: "should do nothing if valueField is different than nullableJson and input is anything but nullableJson",
valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)},
inputType: data.FieldTypeNullableString,
colIdx: 0,
expectedType: data.FieldTypeNullableFloat64,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
maybeFixValueFieldType(tt.valueFields, tt.inputType, tt.colIdx)
assert.Equal(t, tt.valueFields[tt.colIdx].Type(), tt.expectedType)
})
}
}

View File

@ -9,10 +9,15 @@ import (
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/buffered"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/querydata"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
"github.com/grafana/grafana/pkg/tsdb/prometheus/utils"
)
const defaultRetentionPolicy = "default"
@ -22,7 +27,7 @@ var (
glog = log.New("tsdb.influx_influxql")
)
func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, req *backend.QueryDataRequest, features featuremgmt.FeatureToggles) (*backend.QueryDataResponse, error) {
logger := glog.FromContext(ctx)
response := backend.NewQueryDataResponse()
@ -49,7 +54,7 @@ func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req *backend.Quer
return &backend.QueryDataResponse{}, err
}
resp, err := execute(dsInfo, logger, query, request)
resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser))
if err != nil {
response.Responses[query.RefID] = backend.DataResponse{Error: err}
@ -110,7 +115,7 @@ func createRequest(ctx context.Context, logger log.Logger, dsInfo *models.Dataso
return req, nil
}
func execute(dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request) (backend.DataResponse, error) {
func execute(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request, isStreamingParserEnabled bool) (backend.DataResponse, error) {
res, err := dsInfo.HTTPClient.Do(request)
if err != nil {
return backend.DataResponse{}, err
@ -120,6 +125,16 @@ func execute(dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Que
logger.Warn("Failed to close response body", "err", err)
}
}()
resp := ResponseParse(res.Body, res.StatusCode, query)
_, endSpan := utils.StartTrace(ctx, tracer, "datasource.influxdb.influxql.parseResponse")
defer endSpan()
var resp *backend.DataResponse
if isStreamingParserEnabled {
logger.Info("InfluxDB InfluxQL streaming parser enabled: ", "info")
resp = querydata.ResponseParse(res.Body, res.StatusCode, query)
} else {
resp = buffered.ResponseParse(res.Body, res.StatusCode, query)
}
return *resp, nil
}

View File

@ -0,0 +1,53 @@
package influxql
import (
_ "embed"
"fmt"
"io"
"os"
"strings"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/buffered"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/querydata"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
// TEST_MODE=buffered go test -benchmem -run=^$ -memprofile buffered_mem.out -count=10 -bench ^BenchmarkParseJson github.com/grafana/grafana/pkg/tsdb/influxdb/influxql | tee buffered.txt
// TEST_MODE=stream go test -benchmem -run=^$ -memprofile stream_mem.out -count=10 -bench ^BenchmarkParseJson github.com/grafana/grafana/pkg/tsdb/influxdb/influxql | tee stream.txt
// go tool pprof -http=localhost:9999 memprofile.out
// benchstat buffered.txt stream.txt
func BenchmarkParseJson(b *testing.B) {
filePath := "testdata/many_columns.json"
bytes, err := os.ReadFile(filePath)
if err != nil {
panic(fmt.Sprintf("cannot read the file: %s", filePath))
}
testMode := os.Getenv("TEST_MODE")
if testMode == "" {
testMode = "stream"
}
query := &models.Query{
RawQuery: "Test raw query",
UseRawQuery: true,
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf := io.NopCloser(strings.NewReader(string(bytes)))
var result *backend.DataResponse
switch testMode {
case "buffered":
result = buffered.ResponseParse(buf, 200, query)
case "stream":
result = querydata.ResponseParse(buf, 200, query)
}
require.NotNil(b, result.Frames)
require.NoError(b, result.Error)
}
}

View File

@ -0,0 +1,38 @@
package querydata
import (
"fmt"
"io"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/converter"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
func ResponseParse(buf io.ReadCloser, statusCode int, query *models.Query) *backend.DataResponse {
defer func() {
if err := buf.Close(); err != nil {
fmt.Println("Failed to close response body", "err", err)
}
}()
iter := jsoniter.Parse(jsoniter.ConfigDefault, buf, 1024)
r := converter.ReadInfluxQLStyleResult(iter, query)
if statusCode/100 != 2 {
return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", r.Error)}
}
// The ExecutedQueryString can be viewed in QueryInspector in UI
for i, frame := range r.Frames {
if i == 0 {
frame.Meta = &data.FrameMeta{ExecutedQueryString: query.RawQuery, PreferredVisualization: util.GetVisType(query.ResultFormat)}
}
}
return r
}

View File

@ -0,0 +1,80 @@
package querydata
import (
"os"
"path"
"path/filepath"
"strings"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
const (
shouldUpdate = false
testPath = "../testdata"
)
var testFiles = []string{
"all_values_are_null",
"influx_select_all_from_cpu",
"one_measurement_with_two_columns",
"response_with_weird_tag",
"some_values_are_null",
"simple_response",
"multiple_series_with_tags_and_multiple_columns",
"multiple_series_with_tags",
"empty_response",
"metric_find_queries",
"show_tag_values_response",
"retention_policy",
"simple_response_with_diverse_data_types",
"multiple_measurements",
"string_column_with_null_value",
"string_column_with_null_value2",
"many_columns",
"response_with_nil_bools_and_nil_strings",
"invalid_value_format",
}
func TestReadInfluxAsTimeSeries(t *testing.T) {
for _, f := range testFiles {
t.Run(f, runScenario(f, "time_series"))
}
}
func TestReadInfluxAsTable(t *testing.T) {
for _, f := range testFiles {
t.Run(f, runScenario(f, "table"))
}
}
func runScenario(tf string, resultFormat string) func(t *testing.T) {
return func(t *testing.T) {
f, err := os.Open(path.Join(testPath, filepath.Clean(tf+".json")))
require.NoError(t, err)
var rsp *backend.DataResponse
query := &models.Query{
RawQuery: "Test raw query",
UseRawQuery: true,
ResultFormat: resultFormat,
}
rsp = ResponseParse(f, 200, query)
if strings.Contains(tf, "error") {
require.Error(t, rsp.Error)
return
}
require.NoError(t, rsp.Error)
fname := tf + "." + resultFormat + ".golden"
experimental.CheckGoldenJSONResponse(t, testPath, fname, rsp, shouldUpdate)
}
}

View File

@ -1,27 +0,0 @@
package influxql
import (
_ "embed"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
//go:embed testdata/many_columns.json
var testResponse string
// go test -benchmem -run=^$ -memprofile memprofile.out -count=10 -bench ^BenchmarkParseJson$ github.com/grafana/grafana/pkg/tsdb/influxdb/influxql
// go tool pprof -http=localhost:9999 memprofile.out
func BenchmarkParseJson(b *testing.B) {
query := generateQuery("time_series", "")
b.ResetTimer()
for n := 0; n < b.N; n++ {
buf := strings.NewReader(testResponse)
result := parse(buf, 200, query)
require.NotNil(b, result.Frames)
require.NoError(b, result.Error)
}
}

View File

@ -0,0 +1,44 @@
{
"results": [
{
"statement_id": 0,
"series": [
{
"name": "cpu",
"columns": [
"time",
"mean_usage_guest",
"mean_usage_nice",
"mean_usage_idle"
],
"values": [
[
1697984400000,
1111,
1112,
1113
],
[
1697984700000,
2221,
2222,
2223
],
[
1697985000000,
3331,
3332,
3333
],
[
1697985300000,
4441,
4442,
4443
]
]
}
]
}
]
}

View File

@ -0,0 +1,113 @@
// 🌟 This was machine generated. Do not edit. 🌟
//
// Frame[0] {
// "typeVersion": [
// 0,
// 0
// ],
// "preferredVisualisationType": "table",
// "executedQueryString": "Test raw query"
// }
// Name: cpu
// Dimensions: 4 Fields by 4 Rows
// +-------------------------------+------------------------+-----------------------+-----------------------+
// | Name: Time | Name: mean_usage_guest | Name: mean_usage_nice | Name: mean_usage_idle |
// | Labels: | Labels: | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 | Type: []*float64 | Type: []*float64 |
// +-------------------------------+------------------------+-----------------------+-----------------------+
// | 2023-10-22 14:20:00 +0000 UTC | 1111 | 1112 | 1113 |
// | 2023-10-22 14:25:00 +0000 UTC | 2221 | 2222 | 2223 |
// | 2023-10-22 14:30:00 +0000 UTC | 3331 | 3332 | 3333 |
// | 2023-10-22 14:35:00 +0000 UTC | 4441 | 4442 | 4443 |
// +-------------------------------+------------------------+-----------------------+-----------------------+
//
//
// 🌟 This was machine generated. Do not edit. 🌟
{
"status": 200,
"frames": [
{
"schema": {
"name": "cpu",
"meta": {
"typeVersion": [
0,
0
],
"preferredVisualisationType": "table",
"executedQueryString": "Test raw query"
},
"fields": [
{
"name": "Time",
"type": "time",
"typeInfo": {
"frame": "time.Time"
}
},
{
"name": "mean_usage_guest",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
},
"config": {
"displayNameFromDS": "mean_usage_guest"
}
},
{
"name": "mean_usage_nice",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
},
"config": {
"displayNameFromDS": "mean_usage_nice"
}
},
{
"name": "mean_usage_idle",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
},
"config": {
"displayNameFromDS": "mean_usage_idle"
}
}
]
},
"data": {
"values": [
[
1697984400000,
1697984700000,
1697985000000,
1697985300000
],
[
1111,
2221,
3331,
4441
],
[
1112,
2222,
3332,
4442
],
[
1113,
2223,
3333,
4443
]
]
}
}
]
}

View File

@ -0,0 +1,193 @@
// 🌟 This was machine generated. Do not edit. 🌟
//
// Frame[0] {
// "typeVersion": [
// 0,
// 0
// ],
// "preferredVisualisationType": "graph",
// "executedQueryString": "Test raw query"
// }
// Name: cpu.mean_usage_guest
// Dimensions: 2 Fields by 4 Rows
// +-------------------------------+------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 |
// +-------------------------------+------------------+
// | 2023-10-22 14:20:00 +0000 UTC | 1111 |
// | 2023-10-22 14:25:00 +0000 UTC | 2221 |
// | 2023-10-22 14:30:00 +0000 UTC | 3331 |
// | 2023-10-22 14:35:00 +0000 UTC | 4441 |
// +-------------------------------+------------------+
//
//
//
// Frame[1]
// Name: cpu.mean_usage_nice
// Dimensions: 2 Fields by 4 Rows
// +-------------------------------+------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 |
// +-------------------------------+------------------+
// | 2023-10-22 14:20:00 +0000 UTC | 1112 |
// | 2023-10-22 14:25:00 +0000 UTC | 2222 |
// | 2023-10-22 14:30:00 +0000 UTC | 3332 |
// | 2023-10-22 14:35:00 +0000 UTC | 4442 |
// +-------------------------------+------------------+
//
//
//
// Frame[2]
// Name: cpu.mean_usage_idle
// Dimensions: 2 Fields by 4 Rows
// +-------------------------------+------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 |
// +-------------------------------+------------------+
// | 2023-10-22 14:20:00 +0000 UTC | 1113 |
// | 2023-10-22 14:25:00 +0000 UTC | 2223 |
// | 2023-10-22 14:30:00 +0000 UTC | 3333 |
// | 2023-10-22 14:35:00 +0000 UTC | 4443 |
// +-------------------------------+------------------+
//
//
// 🌟 This was machine generated. Do not edit. 🌟
{
"status": 200,
"frames": [
{
"schema": {
"name": "cpu.mean_usage_guest",
"meta": {
"typeVersion": [
0,
0
],
"preferredVisualisationType": "graph",
"executedQueryString": "Test raw query"
},
"fields": [
{
"name": "Time",
"type": "time",
"typeInfo": {
"frame": "time.Time"
}
},
{
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
},
"config": {
"displayNameFromDS": "cpu.mean_usage_guest"
}
}
]
},
"data": {
"values": [
[
1697984400000,
1697984700000,
1697985000000,
1697985300000
],
[
1111,
2221,
3331,
4441
]
]
}
},
{
"schema": {
"name": "cpu.mean_usage_nice",
"fields": [
{
"name": "Time",
"type": "time",
"typeInfo": {
"frame": "time.Time"
}
},
{
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
},
"config": {
"displayNameFromDS": "cpu.mean_usage_nice"
}
}
]
},
"data": {
"values": [
[
1697984400000,
1697984700000,
1697985000000,
1697985300000
],
[
1112,
2222,
3332,
4442
]
]
}
},
{
"schema": {
"name": "cpu.mean_usage_idle",
"fields": [
{
"name": "Time",
"type": "time",
"typeInfo": {
"frame": "time.Time"
}
},
{
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
},
"config": {
"displayNameFromDS": "cpu.mean_usage_idle"
}
}
]
},
"data": {
"values": [
[
1697984400000,
1697984700000,
1697985000000,
1697985300000
],
[
1113,
2223,
3333,
4443
]
]
}
}
]
}

View File

@ -77,8 +77,7 @@ func BuildFrameNameFromQuery(rowName, column string, tags map[string]string, fra
first := true
for k, v := range tags {
if !first {
frameName = append(frameName, ',')
frameName = append(frameName, ' ')
frameName = append(frameName, ',', ' ')
} else {
first = false
}

View File

@ -14,6 +14,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
@ -117,5 +118,22 @@ func GetMockService(version string, rt RoundTripper) *Service {
version: version,
fakeRoundTripper: rt,
},
features: &fakeFeatureToggles{
flags: map[string]bool{
featuremgmt.FlagInfluxqlStreamingParser: false,
},
},
}
}
type fakeFeatureToggles struct {
flags map[string]bool
}
func (f *fakeFeatureToggles) IsEnabledGlobally(flag string) bool {
return f.flags[flag]
}
func (f *fakeFeatureToggles) IsEnabled(ctx context.Context, flag string) bool {
return f.flags[flag]
}