From c088d003f28f6fda715f2a4ccb703c8190b2c895 Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Wed, 6 Dec 2023 12:39:05 +0100 Subject: [PATCH] InfluxDB: Implement InfluxQL json streaming parser (#76934) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Have the first iteration * Prepare bench testing * rename the test files * Remove unnecessary test file * Introduce influxqlStreamingParser feature flag * Apply streaming parser feature flag * Add new tests * More tests * return executedQueryString only in first frame * add frame meta and config * Update golden json files * Support tags/labels * more tests * more tests * Don't change original response_parser.go * provide context * create util package * don't pass the row * update converter with formatted frameName * add executedQueryString info only to first frame * update golden files * rename * update test file * use pointer values * update testdata * update parsing * update converter for null values * prepare converter for table response * clean up * return timeField in fields * handle no time column responses * better nil field handling * refactor the code * add table tests * fix config for table * table response format * fix value * if there is no time column set name * linting * refactoring * handle the status code * add tracing * Update pkg/tsdb/influxdb/influxql/converter/converter_test.go Co-authored-by: İnanç Gümüş * fix import * update test data * sanity * sanity * linting * simplicity * return empty rsp * rename to prevent confusion * nullableJson field type for null values * better handling null values * remove duplicate test file * fix healthcheck * use util for pointer * move bench test to root * provide fake feature manager * add more tests * partial fix for null values in table response format * handle partial null fields * comments for easy testing * move frameName allocation in readSeries * one less append operation * performance improvement by making string conversion once pkg: github.com/grafana/grafana/pkg/tsdb/influxdb/influxql │ stream2.txt │ stream3.txt │ │ sec/op │ sec/op vs base │ ParseJson-10 314.4m ± 1% 303.9m ± 1% -3.34% (p=0.000 n=10) │ stream2.txt │ stream3.txt │ │ B/op │ B/op vs base │ ParseJson-10 425.2Mi ± 0% 382.7Mi ± 0% -10.00% (p=0.000 n=10) │ stream2.txt │ stream3.txt │ │ allocs/op │ allocs/op vs base │ ParseJson-10 7.224M ± 0% 6.689M ± 0% -7.41% (p=0.000 n=10) * add comment lines --------- Co-authored-by: İnanç Gümüş --- .../plugins_integration_test.go | 2 +- pkg/tsdb/influxdb/healthcheck.go | 12 +- pkg/tsdb/influxdb/influxdb.go | 14 +- .../{ => buffered}/response_parser.go | 2 +- .../{ => buffered}/response_parser_test.go | 20 +- .../influxdb/influxql/converter/converter.go | 452 ++++++++++++++++++ .../influxql/converter/converter_test.go | 82 ++++ pkg/tsdb/influxdb/influxql/influxql.go | 23 +- .../influxdb/influxql/parser_bench_test.go | 53 ++ .../influxql/querydata/stream_parser.go | 38 ++ .../influxql/querydata/stream_parser_test.go | 80 ++++ .../influxql/response_parser_bench_test.go | 27 -- .../testdata/influx_select_all_from_cpu.json | 44 ++ ...lux_select_all_from_cpu.table.golden.jsonc | 113 +++++ ...lect_all_from_cpu.time_series.golden.jsonc | 193 ++++++++ pkg/tsdb/influxdb/influxql/util/util.go | 3 +- pkg/tsdb/influxdb/mocks_test.go | 18 + 17 files changed, 1121 insertions(+), 55 deletions(-) rename pkg/tsdb/influxdb/influxql/{ => buffered}/response_parser.go (99%) rename pkg/tsdb/influxdb/influxql/{ => buffered}/response_parser_test.go (97%) create mode 100644 pkg/tsdb/influxdb/influxql/converter/converter.go create mode 100644 pkg/tsdb/influxdb/influxql/converter/converter_test.go create mode 100644 pkg/tsdb/influxdb/influxql/parser_bench_test.go create mode 100644 pkg/tsdb/influxdb/influxql/querydata/stream_parser.go create mode 100644 pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go delete mode 100644 pkg/tsdb/influxdb/influxql/response_parser_bench_test.go create mode 100644 pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.json create mode 100644 pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.table.golden.jsonc create mode 100644 pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.time_series.golden.jsonc diff --git a/pkg/services/pluginsintegration/plugins_integration_test.go b/pkg/services/pluginsintegration/plugins_integration_test.go index c90dd86a2a6..7cff8e07a06 100644 --- a/pkg/services/pluginsintegration/plugins_integration_test.go +++ b/pkg/services/pluginsintegration/plugins_integration_test.go @@ -86,7 +86,7 @@ func TestIntegrationPluginManager(t *testing.T) { cm := cloudmonitoring.ProvideService(hcp, tracer) es := elasticsearch.ProvideService(hcp, tracer) grap := graphite.ProvideService(hcp, tracer) - idb := influxdb.ProvideService(hcp) + idb := influxdb.ProvideService(hcp, features) lk := loki.ProvideService(hcp, features, tracer) otsdb := opentsdb.ProvideService(hcp) pr := prometheus.ProvideService(hcp, cfg, features) diff --git a/pkg/tsdb/influxdb/healthcheck.go b/pkg/tsdb/influxdb/healthcheck.go index 6cbe7f3694e..39a08232420 100644 --- a/pkg/tsdb/influxdb/healthcheck.go +++ b/pkg/tsdb/influxdb/healthcheck.go @@ -7,8 +7,10 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/tsdb/influxdb/flux" "github.com/grafana/grafana/pkg/tsdb/influxdb/fsql" "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql" @@ -35,7 +37,7 @@ func (s *Service) CheckHealth(ctx context.Context, req *backend.CheckHealthReque case influxVersionFlux: return CheckFluxHealth(ctx, dsInfo, req) case influxVersionInfluxQL: - return CheckInfluxQLHealth(ctx, dsInfo) + return CheckInfluxQLHealth(ctx, dsInfo, s.features) case influxVersionSQL: return CheckSQLHealth(ctx, dsInfo, req) default: @@ -78,10 +80,10 @@ func CheckFluxHealth(ctx context.Context, dsInfo *models.DatasourceInfo, return getHealthCheckMessage(logger, "", errors.New("error getting flux query buckets")) } -func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo) (*backend.CheckHealthResult, error) { +func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo, features featuremgmt.FeatureToggles) (*backend.CheckHealthResult, error) { logger := logger.FromContext(ctx) - - resp, err := influxql.Query(ctx, dsInfo, &backend.QueryDataRequest{ + tracer := tracing.DefaultTracer() + resp, err := influxql.Query(ctx, tracer, dsInfo, &backend.QueryDataRequest{ Queries: []backend.DataQuery{ { RefID: refID, @@ -89,7 +91,7 @@ func CheckInfluxQLHealth(ctx context.Context, dsInfo *models.DatasourceInfo) (*b JSON: []byte(`{"query": "SHOW measurements", "rawQuery": true}`), }, }, - }) + }, features) if err != nil { return getHealthCheckMessage(logger, "error performing influxQL query", err) } diff --git a/pkg/tsdb/influxdb/influxdb.go b/pkg/tsdb/influxdb/influxdb.go index 9a03d5ddc8a..b8abe3c9ca9 100644 --- a/pkg/tsdb/influxdb/influxdb.go +++ b/pkg/tsdb/influxdb/influxdb.go @@ -8,7 +8,9 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" + "github.com/grafana/grafana-plugin-sdk-go/backend/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/tsdb/influxdb/flux" "github.com/grafana/grafana/pkg/tsdb/influxdb/fsql" @@ -21,12 +23,14 @@ import ( var logger log.Logger = log.New("tsdb.influxdb") type Service struct { - im instancemgmt.InstanceManager + im instancemgmt.InstanceManager + features featuremgmt.FeatureToggles } -func ProvideService(httpClient httpclient.Provider) *Service { +func ProvideService(httpClient httpclient.Provider, features featuremgmt.FeatureToggles) *Service { return &Service{ - im: datasource.NewInstanceManager(newInstanceSettings(httpClient)), + im: datasource.NewInstanceManager(newInstanceSettings(httpClient)), + features: features, } } @@ -90,6 +94,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) logger := logger.FromContext(ctx) logger.Debug("Received a query request", "numQueries", len(req.Queries)) + tracer := tracing.DefaultTracer() + dsInfo, err := s.getDSInfo(ctx, req.PluginContext) if err != nil { return nil, err @@ -101,7 +107,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) case influxVersionFlux: return flux.Query(ctx, dsInfo, *req) case influxVersionInfluxQL: - return influxql.Query(ctx, dsInfo, req) + return influxql.Query(ctx, tracer, dsInfo, req, s.features) case influxVersionSQL: return fsql.Query(ctx, dsInfo, *req) default: diff --git a/pkg/tsdb/influxdb/influxql/response_parser.go b/pkg/tsdb/influxdb/influxql/buffered/response_parser.go similarity index 99% rename from pkg/tsdb/influxdb/influxql/response_parser.go rename to pkg/tsdb/influxdb/influxql/buffered/response_parser.go index c259dbb507d..41646c19720 100644 --- a/pkg/tsdb/influxdb/influxql/response_parser.go +++ b/pkg/tsdb/influxdb/influxql/buffered/response_parser.go @@ -1,4 +1,4 @@ -package influxql +package buffered import ( "encoding/json" diff --git a/pkg/tsdb/influxdb/influxql/response_parser_test.go b/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go similarity index 97% rename from pkg/tsdb/influxdb/influxql/response_parser_test.go rename to pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go index 62e94d45650..f3a76b56723 100644 --- a/pkg/tsdb/influxdb/influxql/response_parser_test.go +++ b/pkg/tsdb/influxdb/influxql/buffered/response_parser_test.go @@ -1,4 +1,4 @@ -package influxql +package buffered import ( "encoding/json" @@ -20,10 +20,13 @@ import ( "github.com/grafana/grafana/pkg/tsdb/influxdb/models" ) -const shouldUpdate = false +const ( + shouldUpdate = false + testPath = "../testdata" +) func readJsonFile(filePath string) io.ReadCloser { - bytes, err := os.ReadFile(filepath.Join("testdata", filepath.Clean(filePath)+".json")) + bytes, err := os.ReadFile(filepath.Join(testPath, filepath.Clean(filePath)+".json")) if err != nil { panic(fmt.Sprintf("cannot read the file: %s", filePath)) } @@ -42,10 +45,10 @@ func generateQuery(resFormat string, alias string) *models.Query { var testFiles = []string{ "all_values_are_null", + "influx_select_all_from_cpu", "one_measurement_with_two_columns", "response_with_weird_tag", "some_values_are_null", - "error_on_top_level_response", "simple_response", "multiple_series_with_tags_and_multiple_columns", "multiple_series_with_tags", @@ -76,21 +79,16 @@ func TestReadInfluxAsTable(t *testing.T) { func runScenario(tf string, resultFormat string) func(t *testing.T) { return func(t *testing.T) { - f, err := os.Open(path.Join("testdata", filepath.Clean(tf+".json"))) + f, err := os.Open(path.Join(testPath, filepath.Clean(tf+".json"))) require.NoError(t, err) query := generateQuery(resultFormat, "") rsp := ResponseParse(io.NopCloser(f), 200, query) - - if strings.Contains(tf, "error") { - require.Error(t, rsp.Error) - return - } require.NoError(t, rsp.Error) fname := tf + "." + resultFormat + ".golden" - experimental.CheckGoldenJSONResponse(t, "testdata", fname, rsp, shouldUpdate) + experimental.CheckGoldenJSONResponse(t, testPath, fname, rsp, shouldUpdate) } } diff --git a/pkg/tsdb/influxdb/influxql/converter/converter.go b/pkg/tsdb/influxdb/influxql/converter/converter.go new file mode 100644 index 00000000000..d81d2ffc9b6 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/converter/converter.go @@ -0,0 +1,452 @@ +package converter + +import ( + "fmt" + "strings" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + jsoniter "github.com/json-iterator/go" + + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util" + "github.com/grafana/grafana/pkg/tsdb/influxdb/models" + "github.com/grafana/grafana/pkg/util/converter/jsonitere" +) + +func rspErr(e error) *backend.DataResponse { + return &backend.DataResponse{Error: e} +} + +func ReadInfluxQLStyleResult(jIter *jsoniter.Iterator, query *models.Query) *backend.DataResponse { + iter := jsonitere.NewIterator(jIter) + var rsp *backend.DataResponse + +l1Fields: + for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } + switch l1Field { + case "results": + rsp = readResults(iter, query) + if rsp.Error != nil { + return rsp + } + case "": + if err != nil { + return rspErr(err) + } + break l1Fields + default: + v, err := iter.Read() + if err != nil { + rsp.Error = err + return rsp + } + fmt.Printf("[ROOT] unsupported key: %s / %v\n\n", l1Field, v) + } + } + + return rsp +} + +func readResults(iter *jsonitere.Iterator, query *models.Query) *backend.DataResponse { + rsp := &backend.DataResponse{Frames: make(data.Frames, 0)} +l1Fields: + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return rspErr(err) + } + for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } + switch l1Field { + case "series": + rsp = readSeries(iter, query) + case "": + break l1Fields + default: + _, err := iter.Read() + if err != nil { + return rspErr(err) + } + } + } + } + + return rsp +} + +func readSeries(iter *jsonitere.Iterator, query *models.Query) *backend.DataResponse { + var ( + measurement string + tags map[string]string + columns []string + valueFields data.Fields + hasTimeColumn bool + ) + + // frameName is pre-allocated. So we can reuse it, saving memory. + // It's sized for a reasonably-large name, but will grow if needed. + frameName := make([]byte, 0, 128) + + rsp := &backend.DataResponse{Frames: make(data.Frames, 0)} + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return rspErr(err) + } + + for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } + switch l1Field { + case "name": + if measurement, err = iter.ReadString(); err != nil { + return rspErr(err) + } + case "tags": + if tags, err = readTags(iter); err != nil { + return rspErr(err) + } + case "columns": + columns, err = readColumns(iter) + if err != nil { + return rspErr(err) + } + if columns[0] == "time" { + hasTimeColumn = true + } + case "values": + valueFields, err = readValues(iter, hasTimeColumn) + if err != nil { + return rspErr(err) + } + if util.GetVisType(query.ResultFormat) != util.TableVisType { + for i, v := range valueFields { + if v.Type() == data.FieldTypeNullableJSON { + maybeFixValueFieldType(valueFields, data.FieldTypeNullableFloat64, i) + } + } + } + default: + v, err := iter.Read() + if err != nil { + return rspErr(err) + } + fmt.Printf("[Series] unsupported key: %s / %v\n", l1Field, v) + } + } + + if util.GetVisType(query.ResultFormat) == util.TableVisType { + handleTableFormatFirstFrame(rsp, measurement, query) + handleTableFormatFirstField(rsp, valueFields, columns) + handleTableFormatTagFields(rsp, valueFields, tags) + handleTableFormatValueFields(rsp, valueFields, tags, columns) + } else { + // time_series response format + if hasTimeColumn { + // Frame with time column + newFrames := handleTimeSeriesFormatWithTimeColumn(valueFields, tags, columns, measurement, frameName, query) + rsp.Frames = append(rsp.Frames, newFrames...) + } else { + // Frame without time column + newFrame := handleTimeSeriesFormatWithoutTimeColumn(valueFields, columns, measurement, query) + rsp.Frames = append(rsp.Frames, newFrame) + } + } + } + + // if all values are null in a field, we convert the field type to NullableFloat64 + // it is because of the consistency between buffer and stream parser + // also frontend probably will not interpret the nullableJson value + for i, f := range rsp.Frames { + for j, v := range f.Fields { + if v.Type() == data.FieldTypeNullableJSON { + newField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0) + newField.Name = v.Name + newField.Config = v.Config + for k := 0; k < v.Len(); k++ { + newField.Append(nil) + } + rsp.Frames[i].Fields[j] = newField + } + } + } + + return rsp +} + +func readTags(iter *jsonitere.Iterator) (map[string]string, error) { + tags := make(map[string]string) + for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() { + if err != nil { + return nil, err + } + value, err := iter.ReadString() + if err != nil { + return nil, err + } + tags[l1Field] = value + } + return tags, nil +} + +func readColumns(iter *jsonitere.Iterator) (columns []string, err error) { + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return nil, err + } + + l1Field, err := iter.ReadString() + if err != nil { + return nil, err + } + columns = append(columns, l1Field) + } + return columns, nil +} + +func readValues(iter *jsonitere.Iterator, hasTimeColumn bool) (valueFields data.Fields, err error) { + if hasTimeColumn { + valueFields = append(valueFields, data.NewField("Time", nil, make([]time.Time, 0))) + } + + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return nil, err + } + + colIdx := 0 + + for more2, err := iter.ReadArray(); more2; more2, err = iter.ReadArray() { + if err != nil { + return nil, err + } + + if hasTimeColumn && colIdx == 0 { + // Read time + var t float64 + if t, err = iter.ReadFloat64(); err != nil { + return nil, err + } + valueFields[0].Append(time.UnixMilli(int64(t)).UTC()) + + colIdx++ + continue + } + + // Read column values + next, err := iter.WhatIsNext() + if err != nil { + return nil, err + } + + switch next { + case jsoniter.StringValue: + s, err := iter.ReadString() + if err != nil { + return nil, err + } + valueFields = maybeCreateValueField(valueFields, data.FieldTypeNullableString, colIdx) + maybeFixValueFieldType(valueFields, data.FieldTypeNullableString, colIdx) + tryToAppendValue(valueFields, &s, colIdx) + case jsoniter.NumberValue: + n, err := iter.ReadFloat64() + if err != nil { + return nil, err + } + valueFields = maybeCreateValueField(valueFields, data.FieldTypeNullableFloat64, colIdx) + maybeFixValueFieldType(valueFields, data.FieldTypeNullableFloat64, colIdx) + tryToAppendValue(valueFields, &n, colIdx) + case jsoniter.BoolValue: + b, err := iter.ReadAny() + if err != nil { + return nil, err + } + valueFields = maybeCreateValueField(valueFields, data.FieldTypeNullableBool, colIdx) + maybeFixValueFieldType(valueFields, data.FieldTypeNullableBool, colIdx) + tryToAppendValue(valueFields, util.ToPtr(b.ToBool()), colIdx) + case jsoniter.NilValue: + _, _ = iter.Read() + if len(valueFields) <= colIdx { + // no value field created before + // we don't know the type of the values for this field, yet + // so we create a FieldTypeNullableJSON to hold nil values + // if that is something else it will be replaced later + unknownField := data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0) + unknownField.Name = "Value" + valueFields = append(valueFields, unknownField) + } + valueFields[colIdx].Append(nil) + default: + return nil, fmt.Errorf("unknown value type") + } + + colIdx++ + } + } + + return valueFields, nil +} + +// maybeCreateValueField checks whether a value field has created already. +// if it hasn't, creates a new one +func maybeCreateValueField(valueFields data.Fields, expectedType data.FieldType, colIdx int) data.Fields { + if len(valueFields) == colIdx { + newField := data.NewFieldFromFieldType(expectedType, 0) + newField.Name = "Value" + valueFields = append(valueFields, newField) + } + + return valueFields +} + +// maybeFixValueFieldType checks if the value field type is matching +// For nil values we might have added FieldTypeNullableJSON value field +// if the type of the field in valueFields is not matching the expected type +// or the type of the field in valueFields is nullableJSON +// we change the type of the field as expectedType +func maybeFixValueFieldType(valueFields data.Fields, expectedType data.FieldType, colIdx int) { + if valueFields[colIdx].Type() == expectedType || valueFields[colIdx].Type() != data.FieldTypeNullableJSON { + return + } + stringField := data.NewFieldFromFieldType(expectedType, 0) + stringField.Name = "Value" + for i := 0; i < valueFields[colIdx].Len(); i++ { + stringField.Append(nil) + } + valueFields[colIdx] = stringField +} + +func tryToAppendValue[T *string | *float64 | *bool](valueFields data.Fields, value T, colIdx int) { + if valueFields[colIdx].Type() == typeOf(value) { + valueFields[colIdx].Append(value) + } else { + valueFields[colIdx].Append(nil) + } +} + +func typeOf(value interface{}) data.FieldType { + switch v := value.(type) { + case *string: + return data.FieldTypeNullableString + case *float64: + return data.FieldTypeNullableFloat64 + case *bool: + return data.FieldTypeNullableBool + default: + fmt.Printf("unknown value type: %v", v) + return data.FieldTypeNullableJSON + } +} + +func handleTimeSeriesFormatWithTimeColumn(valueFields data.Fields, tags map[string]string, columns []string, measurement string, frameName []byte, query *models.Query) []*data.Frame { + frames := make([]*data.Frame, 0, len(columns)-1) + for i, v := range columns { + if v == "time" { + continue + } + formattedFrameName := string(util.FormatFrameName(measurement, v, tags, *query, frameName[:])) + valueFields[i].Labels = tags + valueFields[i].Config = &data.FieldConfig{DisplayNameFromDS: formattedFrameName} + + frame := data.NewFrame(formattedFrameName, valueFields[0], valueFields[i]) + frames = append(frames, frame) + } + return frames +} + +func handleTimeSeriesFormatWithoutTimeColumn(valueFields data.Fields, columns []string, measurement string, query *models.Query) *data.Frame { + // Frame without time column + if len(columns) >= 2 && strings.Contains(strings.ToLower(query.RawQuery), strings.ToLower("SHOW TAG VALUES")) { + return data.NewFrame(measurement, valueFields[1]) + } + if len(columns) >= 1 { + return data.NewFrame(measurement, valueFields[0]) + } + return nil +} + +func handleTableFormatFirstFrame(rsp *backend.DataResponse, measurement string, query *models.Query) { + // Add the first and only frame for table format + if len(rsp.Frames) == 0 { + newFrame := data.NewFrame(measurement) + newFrame.Meta = &data.FrameMeta{ + ExecutedQueryString: query.RawQuery, + PreferredVisualization: util.GetVisType(query.ResultFormat), + } + rsp.Frames = append(rsp.Frames, newFrame) + } +} + +func handleTableFormatFirstField(rsp *backend.DataResponse, valueFields data.Fields, columns []string) { + if len(rsp.Frames[0].Fields) == 0 { + rsp.Frames[0].Fields = append(rsp.Frames[0].Fields, valueFields[0]) + if columns[0] != "time" { + rsp.Frames[0].Fields[0].Name = columns[0] + rsp.Frames[0].Fields[0].Config = &data.FieldConfig{DisplayNameFromDS: columns[0]} + } + } else { + var i int + for i < valueFields[0].Len() { + rsp.Frames[0].Fields[0].Append(valueFields[0].At(i)) + i++ + } + } +} + +func handleTableFormatTagFields(rsp *backend.DataResponse, valueFields data.Fields, tags map[string]string) { + ti := 1 + // We have the first field, so we should add tagField if there is any tag + for k, v := range tags { + if len(rsp.Frames[0].Fields) == ti { + tagField := data.NewField(k, nil, []*string{}) + tagField.Config = &data.FieldConfig{DisplayNameFromDS: k} + rsp.Frames[0].Fields = append(rsp.Frames[0].Fields, tagField) + } + var i int + for i < valueFields[0].Len() { + val := v[0:] + rsp.Frames[0].Fields[ti].Append(&val) + i++ + } + ti++ + } +} + +func handleTableFormatValueFields(rsp *backend.DataResponse, valueFields data.Fields, tags map[string]string, columns []string) { + // number of fields we currently have in the first frame + // we handled first value field and then tags. + si := len(tags) + 1 + for i, v := range valueFields { + // first value field is always handled first, before tags. + // no need to create another one again here + if i == 0 { + continue + } + + if len(rsp.Frames[0].Fields) == si { + rsp.Frames[0].Fields = append(rsp.Frames[0].Fields, v) + } else { + for vi := 0; vi < v.Len(); vi++ { + if v.Type() == data.FieldTypeNullableJSON { + // add nil explicitly. + // we don't know if it is a float pointer nil or string pointer nil or etc + rsp.Frames[0].Fields[si].Append(nil) + } else { + if v.Type() != rsp.Frames[0].Fields[si].Type() { + maybeFixValueFieldType(rsp.Frames[0].Fields, v.Type(), si) + } + rsp.Frames[0].Fields[si].Append(v.At(vi)) + } + } + } + + rsp.Frames[0].Fields[si].Name = columns[i] + rsp.Frames[0].Fields[si].Config = &data.FieldConfig{DisplayNameFromDS: columns[i]} + si++ + } +} diff --git a/pkg/tsdb/influxdb/influxql/converter/converter_test.go b/pkg/tsdb/influxdb/influxql/converter/converter_test.go new file mode 100644 index 00000000000..2c5e1deecd5 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/converter/converter_test.go @@ -0,0 +1,82 @@ +package converter + +import ( + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/stretchr/testify/assert" +) + +func TestMaybeFixValueFieldType(t *testing.T) { + tests := []struct { + name string + valueFields data.Fields + inputType data.FieldType + colIdx int + expectedType data.FieldType + }{ + { + name: "should do nothing if both are the same type (bool)", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableBool, 0)}, + inputType: data.FieldTypeNullableBool, + colIdx: 0, + expectedType: data.FieldTypeNullableBool, + }, + { + name: "should do nothing if both are the same type (string)", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableString, 0)}, + inputType: data.FieldTypeNullableString, + colIdx: 0, + expectedType: data.FieldTypeNullableString, + }, + { + name: "should do nothing if both are the same type (float64)", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)}, + inputType: data.FieldTypeNullableFloat64, + colIdx: 0, + expectedType: data.FieldTypeNullableFloat64, + }, + { + name: "should return nullableJson if both are nullableJson", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)}, + inputType: data.FieldTypeNullableJSON, + colIdx: 0, + expectedType: data.FieldTypeNullableJSON, + }, + { + name: "should return nullableString if valueField is nullableJson and input is nullableString", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)}, + inputType: data.FieldTypeNullableString, + colIdx: 0, + expectedType: data.FieldTypeNullableString, + }, + { + name: "should return nullableBool if valueField is nullableJson and input is nullableBool", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)}, + inputType: data.FieldTypeNullableBool, + colIdx: 0, + expectedType: data.FieldTypeNullableBool, + }, + { + name: "should return nullableFloat64 if valueField is nullableJson and input is nullableFloat64", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableJSON, 0)}, + inputType: data.FieldTypeNullableFloat64, + colIdx: 0, + expectedType: data.FieldTypeNullableFloat64, + }, + { + name: "should do nothing if valueField is different than nullableJson and input is anything but nullableJson", + valueFields: data.Fields{data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, 0)}, + inputType: data.FieldTypeNullableString, + colIdx: 0, + expectedType: data.FieldTypeNullableFloat64, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + maybeFixValueFieldType(tt.valueFields, tt.inputType, tt.colIdx) + assert.Equal(t, tt.valueFields[tt.colIdx].Type(), tt.expectedType) + }) + } +} diff --git a/pkg/tsdb/influxdb/influxql/influxql.go b/pkg/tsdb/influxdb/influxql/influxql.go index d84144aa415..f6cdf46aef5 100644 --- a/pkg/tsdb/influxdb/influxql/influxql.go +++ b/pkg/tsdb/influxdb/influxql/influxql.go @@ -9,10 +9,15 @@ import ( "strings" "github.com/grafana/grafana-plugin-sdk-go/backend" + "go.opentelemetry.io/otel/trace" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/buffered" + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/querydata" "github.com/grafana/grafana/pkg/tsdb/influxdb/models" + "github.com/grafana/grafana/pkg/tsdb/prometheus/utils" ) const defaultRetentionPolicy = "default" @@ -22,7 +27,7 @@ var ( glog = log.New("tsdb.influx_influxql") ) -func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { +func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, req *backend.QueryDataRequest, features featuremgmt.FeatureToggles) (*backend.QueryDataResponse, error) { logger := glog.FromContext(ctx) response := backend.NewQueryDataResponse() @@ -49,7 +54,7 @@ func Query(ctx context.Context, dsInfo *models.DatasourceInfo, req *backend.Quer return &backend.QueryDataResponse{}, err } - resp, err := execute(dsInfo, logger, query, request) + resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser)) if err != nil { response.Responses[query.RefID] = backend.DataResponse{Error: err} @@ -110,7 +115,7 @@ func createRequest(ctx context.Context, logger log.Logger, dsInfo *models.Dataso return req, nil } -func execute(dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request) (backend.DataResponse, error) { +func execute(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request, isStreamingParserEnabled bool) (backend.DataResponse, error) { res, err := dsInfo.HTTPClient.Do(request) if err != nil { return backend.DataResponse{}, err @@ -120,6 +125,16 @@ func execute(dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Que logger.Warn("Failed to close response body", "err", err) } }() - resp := ResponseParse(res.Body, res.StatusCode, query) + + _, endSpan := utils.StartTrace(ctx, tracer, "datasource.influxdb.influxql.parseResponse") + defer endSpan() + + var resp *backend.DataResponse + if isStreamingParserEnabled { + logger.Info("InfluxDB InfluxQL streaming parser enabled: ", "info") + resp = querydata.ResponseParse(res.Body, res.StatusCode, query) + } else { + resp = buffered.ResponseParse(res.Body, res.StatusCode, query) + } return *resp, nil } diff --git a/pkg/tsdb/influxdb/influxql/parser_bench_test.go b/pkg/tsdb/influxdb/influxql/parser_bench_test.go new file mode 100644 index 00000000000..68e27076138 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/parser_bench_test.go @@ -0,0 +1,53 @@ +package influxql + +import ( + _ "embed" + "fmt" + "io" + "os" + "strings" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/buffered" + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/querydata" + "github.com/grafana/grafana/pkg/tsdb/influxdb/models" +) + +// TEST_MODE=buffered go test -benchmem -run=^$ -memprofile buffered_mem.out -count=10 -bench ^BenchmarkParseJson github.com/grafana/grafana/pkg/tsdb/influxdb/influxql | tee buffered.txt +// TEST_MODE=stream go test -benchmem -run=^$ -memprofile stream_mem.out -count=10 -bench ^BenchmarkParseJson github.com/grafana/grafana/pkg/tsdb/influxdb/influxql | tee stream.txt +// go tool pprof -http=localhost:9999 memprofile.out +// benchstat buffered.txt stream.txt +func BenchmarkParseJson(b *testing.B) { + filePath := "testdata/many_columns.json" + bytes, err := os.ReadFile(filePath) + if err != nil { + panic(fmt.Sprintf("cannot read the file: %s", filePath)) + } + + testMode := os.Getenv("TEST_MODE") + if testMode == "" { + testMode = "stream" + } + + query := &models.Query{ + RawQuery: "Test raw query", + UseRawQuery: true, + } + b.ResetTimer() + + for n := 0; n < b.N; n++ { + buf := io.NopCloser(strings.NewReader(string(bytes))) + var result *backend.DataResponse + switch testMode { + case "buffered": + result = buffered.ResponseParse(buf, 200, query) + case "stream": + result = querydata.ResponseParse(buf, 200, query) + } + require.NotNil(b, result.Frames) + require.NoError(b, result.Error) + } +} diff --git a/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go b/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go new file mode 100644 index 00000000000..dde413c8fa4 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/querydata/stream_parser.go @@ -0,0 +1,38 @@ +package querydata + +import ( + "fmt" + "io" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/data" + jsoniter "github.com/json-iterator/go" + + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/converter" + "github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/util" + "github.com/grafana/grafana/pkg/tsdb/influxdb/models" +) + +func ResponseParse(buf io.ReadCloser, statusCode int, query *models.Query) *backend.DataResponse { + defer func() { + if err := buf.Close(); err != nil { + fmt.Println("Failed to close response body", "err", err) + } + }() + + iter := jsoniter.Parse(jsoniter.ConfigDefault, buf, 1024) + r := converter.ReadInfluxQLStyleResult(iter, query) + + if statusCode/100 != 2 { + return &backend.DataResponse{Error: fmt.Errorf("InfluxDB returned error: %s", r.Error)} + } + + // The ExecutedQueryString can be viewed in QueryInspector in UI + for i, frame := range r.Frames { + if i == 0 { + frame.Meta = &data.FrameMeta{ExecutedQueryString: query.RawQuery, PreferredVisualization: util.GetVisType(query.ResultFormat)} + } + } + + return r +} diff --git a/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go b/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go new file mode 100644 index 00000000000..dfd739453d3 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/querydata/stream_parser_test.go @@ -0,0 +1,80 @@ +package querydata + +import ( + "os" + "path" + "path/filepath" + "strings" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/experimental" + "github.com/stretchr/testify/require" + + "github.com/grafana/grafana/pkg/tsdb/influxdb/models" +) + +const ( + shouldUpdate = false + testPath = "../testdata" +) + +var testFiles = []string{ + "all_values_are_null", + "influx_select_all_from_cpu", + "one_measurement_with_two_columns", + "response_with_weird_tag", + "some_values_are_null", + "simple_response", + "multiple_series_with_tags_and_multiple_columns", + "multiple_series_with_tags", + "empty_response", + "metric_find_queries", + "show_tag_values_response", + "retention_policy", + "simple_response_with_diverse_data_types", + "multiple_measurements", + "string_column_with_null_value", + "string_column_with_null_value2", + "many_columns", + "response_with_nil_bools_and_nil_strings", + "invalid_value_format", +} + +func TestReadInfluxAsTimeSeries(t *testing.T) { + for _, f := range testFiles { + t.Run(f, runScenario(f, "time_series")) + } +} + +func TestReadInfluxAsTable(t *testing.T) { + for _, f := range testFiles { + t.Run(f, runScenario(f, "table")) + } +} + +func runScenario(tf string, resultFormat string) func(t *testing.T) { + return func(t *testing.T) { + f, err := os.Open(path.Join(testPath, filepath.Clean(tf+".json"))) + require.NoError(t, err) + + var rsp *backend.DataResponse + + query := &models.Query{ + RawQuery: "Test raw query", + UseRawQuery: true, + ResultFormat: resultFormat, + } + + rsp = ResponseParse(f, 200, query) + + if strings.Contains(tf, "error") { + require.Error(t, rsp.Error) + return + } + require.NoError(t, rsp.Error) + + fname := tf + "." + resultFormat + ".golden" + experimental.CheckGoldenJSONResponse(t, testPath, fname, rsp, shouldUpdate) + } +} diff --git a/pkg/tsdb/influxdb/influxql/response_parser_bench_test.go b/pkg/tsdb/influxdb/influxql/response_parser_bench_test.go deleted file mode 100644 index 3fe5b206e9b..00000000000 --- a/pkg/tsdb/influxdb/influxql/response_parser_bench_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package influxql - -import ( - _ "embed" - "strings" - "testing" - - "github.com/stretchr/testify/require" -) - -//go:embed testdata/many_columns.json -var testResponse string - -// go test -benchmem -run=^$ -memprofile memprofile.out -count=10 -bench ^BenchmarkParseJson$ github.com/grafana/grafana/pkg/tsdb/influxdb/influxql -// go tool pprof -http=localhost:9999 memprofile.out -func BenchmarkParseJson(b *testing.B) { - query := generateQuery("time_series", "") - - b.ResetTimer() - - for n := 0; n < b.N; n++ { - buf := strings.NewReader(testResponse) - result := parse(buf, 200, query) - require.NotNil(b, result.Frames) - require.NoError(b, result.Error) - } -} diff --git a/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.json b/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.json new file mode 100644 index 00000000000..f1712b5e334 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.json @@ -0,0 +1,44 @@ +{ + "results": [ + { + "statement_id": 0, + "series": [ + { + "name": "cpu", + "columns": [ + "time", + "mean_usage_guest", + "mean_usage_nice", + "mean_usage_idle" + ], + "values": [ + [ + 1697984400000, + 1111, + 1112, + 1113 + ], + [ + 1697984700000, + 2221, + 2222, + 2223 + ], + [ + 1697985000000, + 3331, + 3332, + 3333 + ], + [ + 1697985300000, + 4441, + 4442, + 4443 + ] + ] + } + ] + } + ] +} diff --git a/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.table.golden.jsonc b/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.table.golden.jsonc new file mode 100644 index 00000000000..34c4ee82f62 --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.table.golden.jsonc @@ -0,0 +1,113 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "typeVersion": [ +// 0, +// 0 +// ], +// "preferredVisualisationType": "table", +// "executedQueryString": "Test raw query" +// } +// Name: cpu +// Dimensions: 4 Fields by 4 Rows +// +-------------------------------+------------------------+-----------------------+-----------------------+ +// | Name: Time | Name: mean_usage_guest | Name: mean_usage_nice | Name: mean_usage_idle | +// | Labels: | Labels: | Labels: | Labels: | +// | Type: []time.Time | Type: []*float64 | Type: []*float64 | Type: []*float64 | +// +-------------------------------+------------------------+-----------------------+-----------------------+ +// | 2023-10-22 14:20:00 +0000 UTC | 1111 | 1112 | 1113 | +// | 2023-10-22 14:25:00 +0000 UTC | 2221 | 2222 | 2223 | +// | 2023-10-22 14:30:00 +0000 UTC | 3331 | 3332 | 3333 | +// | 2023-10-22 14:35:00 +0000 UTC | 4441 | 4442 | 4443 | +// +-------------------------------+------------------------+-----------------------+-----------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "name": "cpu", + "meta": { + "typeVersion": [ + 0, + 0 + ], + "preferredVisualisationType": "table", + "executedQueryString": "Test raw query" + }, + "fields": [ + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "mean_usage_guest", + "type": "number", + "typeInfo": { + "frame": "float64", + "nullable": true + }, + "config": { + "displayNameFromDS": "mean_usage_guest" + } + }, + { + "name": "mean_usage_nice", + "type": "number", + "typeInfo": { + "frame": "float64", + "nullable": true + }, + "config": { + "displayNameFromDS": "mean_usage_nice" + } + }, + { + "name": "mean_usage_idle", + "type": "number", + "typeInfo": { + "frame": "float64", + "nullable": true + }, + "config": { + "displayNameFromDS": "mean_usage_idle" + } + } + ] + }, + "data": { + "values": [ + [ + 1697984400000, + 1697984700000, + 1697985000000, + 1697985300000 + ], + [ + 1111, + 2221, + 3331, + 4441 + ], + [ + 1112, + 2222, + 3332, + 4442 + ], + [ + 1113, + 2223, + 3333, + 4443 + ] + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.time_series.golden.jsonc b/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.time_series.golden.jsonc new file mode 100644 index 00000000000..87705aac1ec --- /dev/null +++ b/pkg/tsdb/influxdb/influxql/testdata/influx_select_all_from_cpu.time_series.golden.jsonc @@ -0,0 +1,193 @@ +// 🌟 This was machine generated. Do not edit. 🌟 +// +// Frame[0] { +// "typeVersion": [ +// 0, +// 0 +// ], +// "preferredVisualisationType": "graph", +// "executedQueryString": "Test raw query" +// } +// Name: cpu.mean_usage_guest +// Dimensions: 2 Fields by 4 Rows +// +-------------------------------+------------------+ +// | Name: Time | Name: Value | +// | Labels: | Labels: | +// | Type: []time.Time | Type: []*float64 | +// +-------------------------------+------------------+ +// | 2023-10-22 14:20:00 +0000 UTC | 1111 | +// | 2023-10-22 14:25:00 +0000 UTC | 2221 | +// | 2023-10-22 14:30:00 +0000 UTC | 3331 | +// | 2023-10-22 14:35:00 +0000 UTC | 4441 | +// +-------------------------------+------------------+ +// +// +// +// Frame[1] +// Name: cpu.mean_usage_nice +// Dimensions: 2 Fields by 4 Rows +// +-------------------------------+------------------+ +// | Name: Time | Name: Value | +// | Labels: | Labels: | +// | Type: []time.Time | Type: []*float64 | +// +-------------------------------+------------------+ +// | 2023-10-22 14:20:00 +0000 UTC | 1112 | +// | 2023-10-22 14:25:00 +0000 UTC | 2222 | +// | 2023-10-22 14:30:00 +0000 UTC | 3332 | +// | 2023-10-22 14:35:00 +0000 UTC | 4442 | +// +-------------------------------+------------------+ +// +// +// +// Frame[2] +// Name: cpu.mean_usage_idle +// Dimensions: 2 Fields by 4 Rows +// +-------------------------------+------------------+ +// | Name: Time | Name: Value | +// | Labels: | Labels: | +// | Type: []time.Time | Type: []*float64 | +// +-------------------------------+------------------+ +// | 2023-10-22 14:20:00 +0000 UTC | 1113 | +// | 2023-10-22 14:25:00 +0000 UTC | 2223 | +// | 2023-10-22 14:30:00 +0000 UTC | 3333 | +// | 2023-10-22 14:35:00 +0000 UTC | 4443 | +// +-------------------------------+------------------+ +// +// +// 🌟 This was machine generated. Do not edit. 🌟 +{ + "status": 200, + "frames": [ + { + "schema": { + "name": "cpu.mean_usage_guest", + "meta": { + "typeVersion": [ + 0, + 0 + ], + "preferredVisualisationType": "graph", + "executedQueryString": "Test raw query" + }, + "fields": [ + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "Value", + "type": "number", + "typeInfo": { + "frame": "float64", + "nullable": true + }, + "config": { + "displayNameFromDS": "cpu.mean_usage_guest" + } + } + ] + }, + "data": { + "values": [ + [ + 1697984400000, + 1697984700000, + 1697985000000, + 1697985300000 + ], + [ + 1111, + 2221, + 3331, + 4441 + ] + ] + } + }, + { + "schema": { + "name": "cpu.mean_usage_nice", + "fields": [ + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "Value", + "type": "number", + "typeInfo": { + "frame": "float64", + "nullable": true + }, + "config": { + "displayNameFromDS": "cpu.mean_usage_nice" + } + } + ] + }, + "data": { + "values": [ + [ + 1697984400000, + 1697984700000, + 1697985000000, + 1697985300000 + ], + [ + 1112, + 2222, + 3332, + 4442 + ] + ] + } + }, + { + "schema": { + "name": "cpu.mean_usage_idle", + "fields": [ + { + "name": "Time", + "type": "time", + "typeInfo": { + "frame": "time.Time" + } + }, + { + "name": "Value", + "type": "number", + "typeInfo": { + "frame": "float64", + "nullable": true + }, + "config": { + "displayNameFromDS": "cpu.mean_usage_idle" + } + } + ] + }, + "data": { + "values": [ + [ + 1697984400000, + 1697984700000, + 1697985000000, + 1697985300000 + ], + [ + 1113, + 2223, + 3333, + 4443 + ] + ] + } + } + ] +} \ No newline at end of file diff --git a/pkg/tsdb/influxdb/influxql/util/util.go b/pkg/tsdb/influxdb/influxql/util/util.go index d7c4b67cc2d..5d03fcb1104 100644 --- a/pkg/tsdb/influxdb/influxql/util/util.go +++ b/pkg/tsdb/influxdb/influxql/util/util.go @@ -77,8 +77,7 @@ func BuildFrameNameFromQuery(rowName, column string, tags map[string]string, fra first := true for k, v := range tags { if !first { - frameName = append(frameName, ',') - frameName = append(frameName, ' ') + frameName = append(frameName, ',', ' ') } else { first = false } diff --git a/pkg/tsdb/influxdb/mocks_test.go b/pkg/tsdb/influxdb/mocks_test.go index 4dc690c4ccb..8b99d8ef842 100644 --- a/pkg/tsdb/influxdb/mocks_test.go +++ b/pkg/tsdb/influxdb/mocks_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana/pkg/infra/httpclient" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/tsdb/influxdb/models" ) @@ -117,5 +118,22 @@ func GetMockService(version string, rt RoundTripper) *Service { version: version, fakeRoundTripper: rt, }, + features: &fakeFeatureToggles{ + flags: map[string]bool{ + featuremgmt.FlagInfluxqlStreamingParser: false, + }, + }, } } + +type fakeFeatureToggles struct { + flags map[string]bool +} + +func (f *fakeFeatureToggles) IsEnabledGlobally(flag string) bool { + return f.flags[flag] +} + +func (f *fakeFeatureToggles) IsEnabled(ctx context.Context, flag string) bool { + return f.flags[flag] +}