diff --git a/pkg/tsdb/influxdb/flux/builder.go b/pkg/tsdb/influxdb/flux/builder.go index d986eb50bd5..ee81db09388 100644 --- a/pkg/tsdb/influxdb/flux/builder.go +++ b/pkg/tsdb/influxdb/flux/builder.go @@ -2,6 +2,7 @@ package flux import ( "fmt" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/influxdata/influxdb-client-go/api/query" @@ -38,6 +39,8 @@ type FrameBuilder struct { maxSeries int // max number of series totalSeries int isTimeSeries bool + timeColumn string // sometimes it is not `_time` + timeDisplay string } func isTag(schk string) bool { @@ -79,6 +82,7 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { fb.value = nil fb.columns = make([]columnInfo, 0) fb.isTimeSeries = false + fb.timeColumn = "" for _, col := range columns { switch { @@ -97,7 +101,17 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { } } - if !fb.isTimeSeries { + if fb.isTimeSeries { + col := getTimeSeriesTimeColumn(columns) + if col == nil { + return fmt.Errorf("no time column in timeSeries") + } + fb.timeColumn = col.Name() + fb.timeDisplay = "Time" + if "_time" != fb.timeColumn { + fb.timeDisplay = col.Name() + } + } else { fb.labels = make([]string, 0) for _, col := range columns { converter, err := getConverter(col.DataType()) @@ -114,6 +128,23 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error { return nil } +func getTimeSeriesTimeColumn(columns []*query.FluxColumn) *query.FluxColumn { + // First look for '_time' column + for _, col := range columns { + if col.Name() == "_time" && col.DataType() == timeDatatypeRFC || col.DataType() == timeDatatypeRFCNano { + return col + } + } + + // Then any time column + for _, col := range columns { + if col.DataType() == timeDatatypeRFC || col.DataType() == timeDatatypeRFCNano { + return col + } + } + return nil +} + // Append appends a single entry from an influxdb2 record to a data frame // Values are appended to _value // Tags are appended as labels @@ -139,7 +170,7 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error { data.NewFieldFromFieldType(fb.value.OutputFieldType, 0), ) - fb.active.Fields[0].Name = "Time" + fb.active.Fields[0].Name = fb.timeDisplay name, ok := record.ValueByKey("_field").(string) if ok { fb.active.Fields[1].Name = name @@ -168,11 +199,16 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error { } if fb.isTimeSeries { + time, ok := record.ValueByKey(fb.timeColumn).(time.Time) + if !ok { + return fmt.Errorf("unable to get time colum: %s", fb.timeColumn) + } + val, err := fb.value.Converter(record.Value()) if err != nil { return err } - fb.active.Fields[0].Append(record.Time()) + fb.active.Fields[0].Append(time) fb.active.Fields[1].Append(val) } else { // Table view diff --git a/pkg/tsdb/influxdb/flux/executor_test.go b/pkg/tsdb/influxdb/flux/executor_test.go index 3d655f72ba9..ce480ea15d0 100644 --- a/pkg/tsdb/influxdb/flux/executor_test.go +++ b/pkg/tsdb/influxdb/flux/executor_test.go @@ -209,6 +209,51 @@ func TestAggregateGrouping(t *testing.T) { }) } +func TestNonStandardTimeColumn(t *testing.T) { + ctx := context.Background() + + t.Run("Time Column", func(t *testing.T) { + runner := &MockRunner{ + testDataPath: "non_standard_time_column.csv", + } + + dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50) + if dr.Error != nil { + t.Fatal(dr.Error) + } + + if len(dr.Frames) != 1 { + t.Fatal("Expected one frame") + } + + str, _ := dr.Frames[0].StringTable(-1, -1) + fmt.Println(str) + + // Dimensions: 2 Fields by 1 Rows + // +-----------------------------------------+------------------+ + // | Name: _start_water | Name: | + // | Labels: | Labels: st=1 | + // | Type: []time.Time | Type: []*float64 | + // +-----------------------------------------+------------------+ + // | 2020-06-28 17:50:13.012584046 +0000 UTC | 156.304 | + // +-----------------------------------------+------------------+ + + expectedFrame := data.NewFrame("", + data.NewField("_start_water", nil, []time.Time{ + time.Date(2020, 6, 28, 17, 50, 13, 12584046, time.UTC), + }), + data.NewField("", map[string]string{"st": "1"}, []*float64{ + pointer.Float64(156.304), + }), + ) + expectedFrame.Meta = &data.FrameMeta{} + + if diff := cmp.Diff(expectedFrame, dr.Frames[0], data.FrameTestCompareOptions()...); diff != "" { + t.Errorf("Result mismatch (-want +got):\n%s", diff) + } + }) +} + func TestBuckets(t *testing.T) { ctx := context.Background() diff --git a/pkg/tsdb/influxdb/flux/testdata/non_standard_time_column.csv b/pkg/tsdb/influxdb/flux/testdata/non_standard_time_column.csv new file mode 100644 index 00000000000..9b6d522644d --- /dev/null +++ b/pkg/tsdb/influxdb/flux/testdata/non_standard_time_column.csv @@ -0,0 +1,5 @@ +#group,false,false,true,true,false,true +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,double,string +#default,_result,,,,, +,result,table,_start_water,_stop_water,_value,st +,,0,2020-06-28T17:50:13.012584046Z,2020-06-29T17:50:13.012584046Z,156.304,1