From 0e1e85656b112daa6a64b5afd1a62f305cf40509 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 19 Aug 2020 09:23:11 +0200 Subject: [PATCH] InfluxDB: Upgrade InfluxDB in devenv (#26983) * InfluxDB: Upgrade InfluxDB in devenv * InfluxDB: De-export symbols * InfluxDB: Remove unused code * devenv: Make InfluxDB version configurable Signed-off-by: Arve Knudsen --- devenv/docker/blocks/influxdb/.env | 1 + .../blocks/influxdb/docker-compose.yaml | 2 +- pkg/tsdb/influxdb/flux/builder.go | 26 ++-- pkg/tsdb/influxdb/flux/converters.go | 112 ++---------------- pkg/tsdb/influxdb/flux/executor.go | 8 +- pkg/tsdb/influxdb/flux/executor_test.go | 2 +- pkg/tsdb/influxdb/flux/flux.go | 20 ++-- pkg/tsdb/influxdb/flux/macros.go | 4 +- pkg/tsdb/influxdb/flux/macros_test.go | 17 ++- pkg/tsdb/influxdb/flux/query_models.go | 20 ++-- 10 files changed, 62 insertions(+), 150 deletions(-) create mode 100644 devenv/docker/blocks/influxdb/.env diff --git a/devenv/docker/blocks/influxdb/.env b/devenv/docker/blocks/influxdb/.env new file mode 100644 index 00000000000..ce54e7020a8 --- /dev/null +++ b/devenv/docker/blocks/influxdb/.env @@ -0,0 +1 @@ +influxdb_version=1.8.1-alpine diff --git a/devenv/docker/blocks/influxdb/docker-compose.yaml b/devenv/docker/blocks/influxdb/docker-compose.yaml index 39582ff5f6c..b0f49805b1b 100644 --- a/devenv/docker/blocks/influxdb/docker-compose.yaml +++ b/devenv/docker/blocks/influxdb/docker-compose.yaml @@ -1,5 +1,5 @@ influxdb: - image: influxdb:1.7.6 + image: influxdb:${influxdb_version} container_name: influxdb ports: - '2004:2004' diff --git a/pkg/tsdb/influxdb/flux/builder.go b/pkg/tsdb/influxdb/flux/builder.go index a49c9ffd5e4..2285e52408b 100644 --- a/pkg/tsdb/influxdb/flux/builder.go +++ b/pkg/tsdb/influxdb/flux/builder.go @@ -27,8 +27,8 @@ type columnInfo struct { converter *data.FieldConverter } -// FrameBuilder This is an interface to help testing -type FrameBuilder struct { +// frameBuilder is an interface to help testing. +type frameBuilder struct { tableID int64 active *data.Frame frames []*data.Frame @@ -50,23 +50,23 @@ func isTag(schk string) bool { func getConverter(t string) (*data.FieldConverter, error) { switch t { case stringDatatype: - return &AnyToOptionalString, nil + return &anyToOptionalString, nil case timeDatatypeRFC: - return &TimeToOptionalTime, nil + return &timeToOptionalTime, nil case timeDatatypeRFCNano: - return &TimeToOptionalTime, nil + return &timeToOptionalTime, nil case durationDatatype: - return &Int64ToOptionalInt64, nil + return &int64ToOptionalInt64, nil case doubleDatatype: - return &Float64ToOptionalFloat64, nil + return &float64ToOptionalFloat64, nil case boolDatatype: - return &BoolToOptionalBool, nil + return &boolToOptionalBool, nil case longDatatype: - return &Int64ToOptionalInt64, nil + return &int64ToOptionalInt64, nil case uLongDatatype: - return &UInt64ToOptionalUInt64, nil + return &uint64ToOptionalUInt64, nil case base64BinaryDataType: - return &AnyToOptionalString, nil + return &anyToOptionalString, nil } return nil, fmt.Errorf("no matching converter found for [%v]", t) @@ -75,7 +75,7 @@ func getConverter(t string) (*data.FieldConverter, error) { // Init initializes the frame to be returned // fields points at entries in the frame, and provides easier access // names indexes the columns encountered -func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { +func (fb *frameBuilder) Init(metadata *query.FluxTableMetadata) error { columns := metadata.Columns() fb.frames = make([]*data.Frame, 0) fb.tableID = -1 @@ -153,7 +153,7 @@ func getTimeSeriesTimeColumn(columns []*query.FluxColumn) *query.FluxColumn { // Tags are appended as labels // _measurement holds the dataframe name // _field holds the field name. -func (fb *FrameBuilder) Append(record *query.FluxRecord) error { +func (fb *frameBuilder) Append(record *query.FluxRecord) error { table, ok := record.ValueByKey("table").(int64) if ok && table != fb.tableID { fb.totalSeries++ diff --git a/pkg/tsdb/influxdb/flux/converters.go b/pkg/tsdb/influxdb/flux/converters.go index 9c9766191ed..74e4fda9eee 100644 --- a/pkg/tsdb/influxdb/flux/converters.go +++ b/pkg/tsdb/influxdb/flux/converters.go @@ -2,34 +2,13 @@ package flux import ( "fmt" - "strconv" "time" "github.com/grafana/grafana-plugin-sdk-go/data" ) -// Int64NOOP ..... -var Int64NOOP = data.FieldConverter{ - OutputFieldType: data.FieldTypeInt64, -} - -// BoolNOOP ..... -var BoolNOOP = data.FieldConverter{ - OutputFieldType: data.FieldTypeBool, -} - -// Float64NOOP ..... -var Float64NOOP = data.FieldConverter{ - OutputFieldType: data.FieldTypeFloat64, -} - -// StringNOOP value is already in the proper format -var StringNOOP = data.FieldConverter{ - OutputFieldType: data.FieldTypeString, -} - -// AnyToOptionalString any value as a string -var AnyToOptionalString = data.FieldConverter{ +// anyToOptionalString any value as a string. +var anyToOptionalString = data.FieldConverter{ OutputFieldType: data.FieldTypeNullableString, Converter: func(v interface{}) (interface{}, error) { if v == nil { @@ -40,8 +19,8 @@ var AnyToOptionalString = data.FieldConverter{ }, } -// Float64ToOptionalFloat64 optional float value -var Float64ToOptionalFloat64 = data.FieldConverter{ +// float64ToOptionalFloat64 optional float value +var float64ToOptionalFloat64 = data.FieldConverter{ OutputFieldType: data.FieldTypeNullableFloat64, Converter: func(v interface{}) (interface{}, error) { if v == nil { @@ -55,8 +34,8 @@ var Float64ToOptionalFloat64 = data.FieldConverter{ }, } -// Int64ToOptionalInt64 optional int value -var Int64ToOptionalInt64 = data.FieldConverter{ +// int64ToOptionalInt64 optional int value +var int64ToOptionalInt64 = data.FieldConverter{ OutputFieldType: data.FieldTypeNullableInt64, Converter: func(v interface{}) (interface{}, error) { if v == nil { @@ -70,8 +49,8 @@ var Int64ToOptionalInt64 = data.FieldConverter{ }, } -// UInt64ToOptionalUInt64 optional int value -var UInt64ToOptionalUInt64 = data.FieldConverter{ +// uint64ToOptionalUInt64 optional int value +var uint64ToOptionalUInt64 = data.FieldConverter{ OutputFieldType: data.FieldTypeNullableUint64, Converter: func(v interface{}) (interface{}, error) { if v == nil { @@ -85,8 +64,8 @@ var UInt64ToOptionalUInt64 = data.FieldConverter{ }, } -// BoolToOptionalBool optional int value -var BoolToOptionalBool = data.FieldConverter{ +// boolToOptionalBool optional int value +var boolToOptionalBool = data.FieldConverter{ OutputFieldType: data.FieldTypeNullableBool, Converter: func(v interface{}) (interface{}, error) { if v == nil { @@ -100,8 +79,8 @@ var BoolToOptionalBool = data.FieldConverter{ }, } -// TimeToOptionalTime optional int value -var TimeToOptionalTime = data.FieldConverter{ +// timeToOptionalTime optional int value +var timeToOptionalTime = data.FieldConverter{ OutputFieldType: data.FieldTypeNullableTime, Converter: func(v interface{}) (interface{}, error) { if v == nil { @@ -114,70 +93,3 @@ var TimeToOptionalTime = data.FieldConverter{ return &val, nil }, } - -// RFC3339StringToNullableTime ..... -func RFC3339StringToNullableTime(s string) (*time.Time, error) { - if s == "" { - return nil, nil - } - - rv, err := time.Parse(time.RFC3339, s) - if err != nil { - return nil, err - } - - u := rv.UTC() - return &u, nil -} - -// StringToOptionalFloat64 string to float -var StringToOptionalFloat64 = data.FieldConverter{ - OutputFieldType: data.FieldTypeNullableFloat64, - Converter: func(v interface{}) (interface{}, error) { - if v == nil { - return nil, nil - } - val, ok := v.(string) - if !ok { // or return some default value instead of erroring - return nil, fmt.Errorf("[floatz] expected string input but got type %T", v) - } - fV, err := strconv.ParseFloat(val, 64) - return &fV, err - }, -} - -// Float64EpochSecondsToTime numeric seconds to time -var Float64EpochSecondsToTime = data.FieldConverter{ - OutputFieldType: data.FieldTypeTime, - Converter: func(v interface{}) (interface{}, error) { - fV, ok := v.(float64) - if !ok { // or return some default value instead of erroring - return nil, fmt.Errorf("[seconds] expected float64 input but got type %T", v) - } - return time.Unix(int64(fV), 0).UTC(), nil - }, -} - -// Float64EpochMillisToTime convert to time -var Float64EpochMillisToTime = data.FieldConverter{ - OutputFieldType: data.FieldTypeTime, - Converter: func(v interface{}) (interface{}, error) { - fV, ok := v.(float64) - if !ok { // or return some default value instead of erroring - return nil, fmt.Errorf("[ms] expected float64 input but got type %T", v) - } - return time.Unix(0, int64(fV)*int64(time.Millisecond)).UTC(), nil - }, -} - -// Boolean ... -var Boolean = data.FieldConverter{ - OutputFieldType: data.FieldTypeBool, - Converter: func(v interface{}) (interface{}, error) { - fV, ok := v.(bool) - if !ok { // or return some default value instead of erroring - return nil, fmt.Errorf("[ms] expected bool input but got type %T", v) - } - return fV, nil - }, -} diff --git a/pkg/tsdb/influxdb/flux/executor.go b/pkg/tsdb/influxdb/flux/executor.go index 581ea0ddfb7..a655d795065 100644 --- a/pkg/tsdb/influxdb/flux/executor.go +++ b/pkg/tsdb/influxdb/flux/executor.go @@ -9,12 +9,12 @@ import ( "github.com/influxdata/influxdb-client-go/v2/api" ) -// executeQuery runs a flux query using the QueryModel to interpolate the query and the runner to execute it. +// executeQuery runs a flux query using the queryModel to interpolate the query and the runner to execute it. // maxSeries somehow limits the response. -func executeQuery(ctx context.Context, query QueryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) { +func executeQuery(ctx context.Context, query queryModel, runner queryRunner, maxSeries int) (dr backend.DataResponse) { dr = backend.DataResponse{} - flux, err := Interpolate(query) + flux, err := interpolate(query) if err != nil { dr.Error = err return @@ -50,7 +50,7 @@ func readDataFrames(result *api.QueryTableResult, maxPoints int, maxSeries int) glog.Debug("Reading data frames from query result", "maxPoints", maxPoints, "maxSeries", maxSeries) dr = backend.DataResponse{} - builder := &FrameBuilder{ + builder := &frameBuilder{ maxPoints: maxPoints, maxSeries: maxSeries, } diff --git a/pkg/tsdb/influxdb/flux/executor_test.go b/pkg/tsdb/influxdb/flux/executor_test.go index ff2b20fd18d..826b6ae08ed 100644 --- a/pkg/tsdb/influxdb/flux/executor_test.go +++ b/pkg/tsdb/influxdb/flux/executor_test.go @@ -55,7 +55,7 @@ func verifyGoldenResponse(name string) (*backend.DataResponse, error) { testDataPath: name + ".csv", } - dr := executeQuery(context.Background(), QueryModel{MaxDataPoints: 100}, runner, 50) + dr := executeQuery(context.Background(), queryModel{MaxDataPoints: 100}, runner, 50) err := experimental.CheckGoldenDataResponse("./testdata/"+name+".golden.txt", &dr, true) return &dr, err } diff --git a/pkg/tsdb/influxdb/flux/flux.go b/pkg/tsdb/influxdb/flux/flux.go index bd9d353c90b..3cb1c4985a1 100644 --- a/pkg/tsdb/influxdb/flux/flux.go +++ b/pkg/tsdb/influxdb/flux/flux.go @@ -26,29 +26,29 @@ func Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQ tRes := &tsdb.Response{ Results: make(map[string]*tsdb.QueryResult), } - runner, err := RunnerFromDataSource(dsInfo) + r, err := runnerFromDataSource(dsInfo) if err != nil { return nil, err } - defer runner.client.Close() + defer r.client.Close() for _, query := range tsdbQuery.Queries { - qm, err := GetQueryModelTSDB(query, tsdbQuery.TimeRange, dsInfo) + qm, err := getQueryModelTSDB(query, tsdbQuery.TimeRange, dsInfo) if err != nil { tRes.Results[query.RefId] = &tsdb.QueryResult{Error: err} continue } - res := executeQuery(context.Background(), *qm, runner, 50) + res := executeQuery(context.Background(), *qm, r, 50) tRes.Results[query.RefId] = backendDataResponseToTSDBResponse(&res, query.RefId) } return tRes, nil } -// Runner is an influxdb2 Client with an attached org property and is used +// runner is an influxdb2 Client with an attached org property and is used // for running flux queries. -type Runner struct { +type runner struct { client influxdb2.Client org string } @@ -59,13 +59,13 @@ type queryRunner interface { } // runQuery executes fluxQuery against the Runner's organization and returns a Flux typed result. -func (r *Runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) { +func (r *runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTableResult, error) { qa := r.client.QueryAPI(r.org) return qa.Query(ctx, fluxQuery) } -// RunnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration). -func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) { +// runnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration). +func runnerFromDataSource(dsInfo *models.DataSource) (*runner, error) { org := dsInfo.JsonData.Get("organization").MustString("") if org == "" { return nil, fmt.Errorf("missing organization in datasource configuration") @@ -86,7 +86,7 @@ func RunnerFromDataSource(dsInfo *models.DataSource) (*Runner, error) { return nil, err } opts.HTTPOptions().SetHTTPClient(hc) - return &Runner{ + return &runner{ client: influxdb2.NewClientWithOptions(url, token, opts), org: org, }, nil diff --git a/pkg/tsdb/influxdb/flux/macros.go b/pkg/tsdb/influxdb/flux/macros.go index b119dc67fac..dd224fe7729 100644 --- a/pkg/tsdb/influxdb/flux/macros.go +++ b/pkg/tsdb/influxdb/flux/macros.go @@ -11,8 +11,8 @@ import ( const variableFilter = `(?m)([a-zA-Z]+)\.([a-zA-Z]+)` -// Interpolate processes macros -func Interpolate(query QueryModel) (string, error) { +// interpolate processes macros +func interpolate(query queryModel) (string, error) { flux := query.RawQuery variableFilterExp, err := regexp.Compile(variableFilter) diff --git a/pkg/tsdb/influxdb/flux/macros_test.go b/pkg/tsdb/influxdb/flux/macros_test.go index a2610a6db7b..363c2cc41ed 100644 --- a/pkg/tsdb/influxdb/flux/macros_test.go +++ b/pkg/tsdb/influxdb/flux/macros_test.go @@ -6,6 +6,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestInterpolate(t *testing.T) { @@ -17,7 +19,7 @@ func TestInterpolate(t *testing.T) { To: time.Unix(0, 0), } - options := QueryOptions{ + options := queryOptions{ Organization: "grafana1", Bucket: "grafana2", DefaultBucket: "grafana3", @@ -36,20 +38,17 @@ func TestInterpolate(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - query := QueryModel{ + query := queryModel{ RawQuery: tt.before, Options: options, TimeRange: timeRange, MaxDataPoints: 1, Interval: 1000 * 1000 * 1000, } - interpolatedQuery, err := Interpolate(query) - if err != nil { - t.Fatal(err) - } - if diff := cmp.Diff(tt.after, interpolatedQuery); diff != "" { - t.Fatalf("Result mismatch (-want +got):\n%s", diff) - } + interpolatedQuery, err := interpolate(query) + require.NoError(t, err) + diff := cmp.Diff(tt.after, interpolatedQuery) + assert.Equal(t, "", diff) }) } } diff --git a/pkg/tsdb/influxdb/flux/query_models.go b/pkg/tsdb/influxdb/flux/query_models.go index c36666316ec..2e588bae6bc 100644 --- a/pkg/tsdb/influxdb/flux/query_models.go +++ b/pkg/tsdb/influxdb/flux/query_models.go @@ -10,17 +10,17 @@ import ( "github.com/grafana/grafana/pkg/tsdb" ) -// QueryOptions represents datasource configuration options -type QueryOptions struct { +// queryOptions represents datasource configuration options +type queryOptions struct { Bucket string `json:"bucket"` DefaultBucket string `json:"defaultBucket"` Organization string `json:"organization"` } -// QueryModel represents a spreadsheet query. -type QueryModel struct { +// queryModel represents a query. +type queryModel struct { RawQuery string `json:"query"` - Options QueryOptions `json:"options"` + Options queryOptions `json:"options"` // Not from JSON TimeRange backend.TimeRange `json:"-"` @@ -31,8 +31,8 @@ type QueryModel struct { // The following is commented out but kept as it should be useful when // restoring this code to be closer to the SDK's models. -// func GetQueryModel(query backend.DataQuery) (*QueryModel, error) { -// model := &QueryModel{} +// func GetQueryModel(query backend.DataQuery) (*queryModel, error) { +// model := &queryModel{} // err := json.Unmarshal(query.JSON, &model) // if err != nil { @@ -46,9 +46,9 @@ type QueryModel struct { // return model, nil // } -// GetQueryModelTSDB builds a QueryModel from tsdb.Query information and datasource configuration (dsInfo). -func GetQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *models.DataSource) (*QueryModel, error) { - model := &QueryModel{} +// getQueryModelTSDB builds a queryModel from tsdb.Query information and datasource configuration (dsInfo). +func getQueryModelTSDB(query *tsdb.Query, timeRange *tsdb.TimeRange, dsInfo *models.DataSource) (*queryModel, error) { + model := &queryModel{} queryBytes, err := query.Model.Encode() if err != nil { return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err)