mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
InfluxDB: support aggregate results (#25726)
This commit is contained in:
parent
8d1ed33e20
commit
740a9ad5f9
@ -91,7 +91,6 @@ func (fb *FrameBuilder) Init(metadata *query.FluxTableMetadata) error {
|
||||
return err
|
||||
}
|
||||
fb.value = converter
|
||||
case col.Name() == "_measurement":
|
||||
fb.isTimeSeries = true
|
||||
case isTag(col.Name()):
|
||||
fb.labels = append(fb.labels, col.Name())
|
||||
@ -129,19 +128,31 @@ func (fb *FrameBuilder) Append(record *query.FluxRecord) error {
|
||||
}
|
||||
|
||||
if fb.isTimeSeries {
|
||||
// Series Data
|
||||
labels := make(map[string]string)
|
||||
for _, name := range fb.labels {
|
||||
labels[name] = record.ValueByKey(name).(string)
|
||||
frameName, ok := record.ValueByKey("_measurement").(string)
|
||||
if !ok {
|
||||
frameName = "" // empty frame name
|
||||
}
|
||||
|
||||
fb.active = data.NewFrame(
|
||||
record.Measurement(),
|
||||
frameName,
|
||||
data.NewFieldFromFieldType(data.FieldTypeTime, 0),
|
||||
data.NewFieldFromFieldType(fb.value.OutputFieldType, 0),
|
||||
)
|
||||
|
||||
fb.active.Fields[0].Name = "Time"
|
||||
fb.active.Fields[1].Name = record.Field()
|
||||
name, ok := record.ValueByKey("_field").(string)
|
||||
if ok {
|
||||
fb.active.Fields[1].Name = name
|
||||
}
|
||||
|
||||
// set the labels
|
||||
labels := make(map[string]string)
|
||||
for _, name := range fb.labels {
|
||||
val, ok := record.ValueByKey(name).(string)
|
||||
if ok {
|
||||
labels[name] = val
|
||||
}
|
||||
}
|
||||
fb.active.Fields[1].Labels = labels
|
||||
} else {
|
||||
fields := make([]*data.Field, len(fb.columns))
|
||||
|
@ -1,30 +1,10 @@
|
||||
package flux
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var isField = regexp.MustCompile(`^_(time|value|measurement|field|start|stop)$`)
|
||||
|
||||
func TestColumnIdentification(t *testing.T) {
|
||||
t.Run("Test Field Identification", func(t *testing.T) {
|
||||
fieldNames := []string{"_time", "_value", "_measurement", "_field", "_start", "_stop"}
|
||||
for _, item := range fieldNames {
|
||||
if !isField.MatchString(item) {
|
||||
t.Fatal("Field", item, "Expected field, but got false")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Test Not Field Identification", func(t *testing.T) {
|
||||
fieldNames := []string{"_header", "_cpu", "_hello", "_doctor"}
|
||||
for _, item := range fieldNames {
|
||||
if isField.MatchString(item) {
|
||||
t.Fatal("Field", item, "Expected NOT a field, but got true")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Test Tag Identification", func(t *testing.T) {
|
||||
tagNames := []string{"header", "value", "tag"}
|
||||
|
@ -10,6 +10,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/xorcare/pointer"
|
||||
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go"
|
||||
"github.com/influxdata/influxdb-client-go/api"
|
||||
)
|
||||
@ -152,6 +156,59 @@ func TestExecuteGrouping(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAggregateGrouping(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("Grouping Test", func(t *testing.T) {
|
||||
runner := &MockRunner{
|
||||
testDataPath: "aggregate.csv",
|
||||
}
|
||||
|
||||
dr := ExecuteQuery(ctx, QueryModel{MaxDataPoints: 100}, runner, 50)
|
||||
if dr.Error != nil {
|
||||
t.Fatal(dr.Error)
|
||||
}
|
||||
|
||||
if len(dr.Frames) != 1 {
|
||||
t.Fatal("Expected one frame")
|
||||
}
|
||||
|
||||
str, _ := dr.Frames[0].StringTable(-1, -1)
|
||||
fmt.Println(str)
|
||||
|
||||
// `Name:
|
||||
// Dimensions: 2 Fields by 3 Rows
|
||||
// +-------------------------------+--------------------------+
|
||||
// | Name: Time | Name: |
|
||||
// | Labels: | Labels: host=hostname.ru |
|
||||
// | Type: []time.Time | Type: []*float64 |
|
||||
// +-------------------------------+--------------------------+
|
||||
// | 2020-06-05 12:06:00 +0000 UTC | 8.291 |
|
||||
// | 2020-06-05 12:07:00 +0000 UTC | 0.534 |
|
||||
// | 2020-06-05 12:08:00 +0000 UTC | 0.667 |
|
||||
// +-------------------------------+--------------------------+
|
||||
// `
|
||||
|
||||
expectedFrame := data.NewFrame("",
|
||||
data.NewField("Time", nil, []time.Time{
|
||||
time.Date(2020, 6, 5, 12, 6, 0, 0, time.UTC),
|
||||
time.Date(2020, 6, 5, 12, 7, 0, 0, time.UTC),
|
||||
time.Date(2020, 6, 5, 12, 8, 0, 0, time.UTC),
|
||||
}),
|
||||
data.NewField("", map[string]string{"host": "hostname.ru"}, []*float64{
|
||||
pointer.Float64(8.291),
|
||||
pointer.Float64(0.534),
|
||||
pointer.Float64(0.667),
|
||||
}),
|
||||
)
|
||||
expectedFrame.Meta = &data.FrameMeta{}
|
||||
|
||||
if diff := cmp.Diff(expectedFrame, dr.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
|
||||
t.Errorf("Result mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestBuckets(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
7
pkg/tsdb/influxdb/flux/testdata/aggregate.csv
vendored
Normal file
7
pkg/tsdb/influxdb/flux/testdata/aggregate.csv
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,double
|
||||
#group,false,false,true,true,false,true,false
|
||||
#default,_result,,,,,,
|
||||
,result,table,_start,_stop,_time,host,_value
|
||||
,,0,2020-06-05T12:03:27.444072266Z,2020-06-05T12:08:27.444072266Z,2020-06-05T12:06:00Z,hostname.ru,8.291
|
||||
,,0,2020-06-05T12:03:27.444072266Z,2020-06-05T12:08:27.444072266Z,2020-06-05T12:07:00Z,hostname.ru,0.534
|
||||
,,0,2020-06-05T12:03:27.444072266Z,2020-06-05T12:08:27.444072266Z,2020-06-05T12:08:00Z,hostname.ru,0.667
|
|
Loading…
Reference in New Issue
Block a user