From 0d0888187693618ae16ed2e40b7b8662f8653e2a Mon Sep 17 00:00:00 2001 From: Dimitris Sotirakis Date: Fri, 18 Jun 2021 13:26:19 +0300 Subject: [PATCH] Elasticsearch: Convert Timeseries and Tables to Dataframes (#34710) * Rebase (broken tests) * Removed tables - refactored processAggregationDocs func * Tests cleanup * Nit - add space in percentile legend titles * Fix labels naming - use metricAggType map * Fix bug which appended same fields over and over again * Replace test with dataframes * Fix tests * Add nolint:gocyclo - will need refactoring * Move frames tags assignment * Add failing test fo when ES server is booting up * Revert "Add failing test fo when ES server is booting up" This reverts commit fd14a1fd5e181e475f9c5d6a52bd48496f24b822. Co-authored-by: Elfo404 --- pkg/tsdb/elasticsearch/response_parser.go | 433 ++++++---- .../elasticsearch/response_parser_test.go | 814 +++++++++--------- 2 files changed, 661 insertions(+), 586 deletions(-) diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index a90d29270db..d932e71dc59 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -6,8 +6,9 @@ import ( "sort" "strconv" "strings" + "time" - "github.com/grafana/grafana/pkg/components/null" + "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/plugins" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" @@ -69,20 +70,12 @@ func (rp *responseParser) getTimeSeries() (plugins.DataResponse, error) { Meta: debugInfo, } props := make(map[string]string) - table := plugins.DataTable{ - Columns: make([]plugins.DataTableColumn, 0), - Rows: make([]plugins.DataRowValues, 0), - } - err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0) + err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0) if err != nil { return plugins.DataResponse{}, err } - rp.nameSeries(queryRes.Series, target) - rp.trimDatapoints(queryRes.Series, target) - - if len(table.Rows) > 0 { - queryRes.Tables = append(queryRes.Tables, table) - } + rp.nameFields(queryRes, target) + rp.trimDatapoints(queryRes, target) result.Results[target.RefID] = queryRes } @@ -91,7 +84,7 @@ func (rp *responseParser) getTimeSeries() (plugins.DataResponse, error) { // nolint:staticcheck // plugins.* deprecated func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, - series *plugins.DataTimeSeriesSlice, table *plugins.DataTable, props map[string]string, depth int) error { + queryResult *plugins.DataQueryResult, props map[string]string, depth int) error { var err error maxDepth := len(target.BucketAggs) - 1 @@ -110,9 +103,9 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu if depth == maxDepth { if aggDef.Type == dateHistType { - err = rp.processMetrics(esAgg, target, series, props) + err = rp.processMetrics(esAgg, target, queryResult, props) } else { - err = rp.processAggregationDocs(esAgg, aggDef, target, table, props) + err = rp.processAggregationDocs(esAgg, aggDef, target, queryResult, props) } if err != nil { return err @@ -135,7 +128,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu if key, err := bucket.Get("key_as_string").String(); err == nil { newProps[aggDef.Field] = key } - err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1) + err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1) if err != nil { return err } @@ -158,7 +151,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu newProps["filter"] = bucketKey - err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1) + err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1) if err != nil { return err } @@ -168,35 +161,40 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu return nil } -// nolint:staticcheck // plugins.* deprecated -func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *plugins.DataTimeSeriesSlice, +// nolint:staticcheck,gocyclo // plugins.* deprecated +func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, query *plugins.DataQueryResult, props map[string]string) error { + frames := data.Frames{} + esAggBuckets := esAgg.Get("buckets").MustArray() + for _, metric := range target.Metrics { if metric.Hide { continue } + tags := make(map[string]string, len(props)) + timeVector := make([]time.Time, 0, len(esAggBuckets)) + values := make([]*float64, 0, len(esAggBuckets)) + switch metric.Type { case countType: - newSeries := plugins.DataTimeSeries{ - Tags: make(map[string]string), - } - - for _, v := range esAgg.Get("buckets").MustArray() { + for _, v := range esAggBuckets { bucket := simplejson.NewFromAny(v) - value := castToNullFloat(bucket.Get("doc_count")) - key := castToNullFloat(bucket.Get("key")) - newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) + value := castToFloat(bucket.Get("doc_count")) + key := castToFloat(bucket.Get("key")) + timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) + values = append(values, value) } for k, v := range props { - newSeries.Tags[k] = v + tags[k] = v } - newSeries.Tags["metric"] = countType - *series = append(*series, newSeries) - + tags["metric"] = countType + frames = append(frames, data.NewFrame(metric.Field, + data.NewField("time", nil, timeVector), + data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field}))) case percentilesType: - buckets := esAgg.Get("buckets").MustArray() + buckets := esAggBuckets if len(buckets) == 0 { break } @@ -210,28 +208,72 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, } sort.Strings(percentileKeys) for _, percentileName := range percentileKeys { - newSeries := plugins.DataTimeSeries{ - Tags: make(map[string]string), - } + tags := make(map[string]string, len(props)) + timeVector := make([]time.Time, 0, len(esAggBuckets)) + values := make([]*float64, 0, len(esAggBuckets)) + for k, v := range props { - newSeries.Tags[k] = v + tags[k] = v } - newSeries.Tags["metric"] = "p" + percentileName - newSeries.Tags["field"] = metric.Field + tags["metric"] = "p" + percentileName + tags["field"] = metric.Field for _, v := range buckets { bucket := simplejson.NewFromAny(v) - value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName)) - key := castToNullFloat(bucket.Get("key")) - newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) + value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName)) + key := castToFloat(bucket.Get("key")) + timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) + values = append(values, value) } - *series = append(*series, newSeries) + frames = append(frames, data.NewFrame(metric.Field, + data.NewField("time", nil, timeVector), + data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field}))) } case topMetricsType: - topMetricSeries := processTopMetrics(metric, esAgg, props) - *series = append(*series, topMetricSeries...) + buckets := esAggBuckets + metrics := metric.Settings.Get("metrics").MustArray() + + for _, metricField := range metrics { + tags := make(map[string]string, len(props)) + timeVector := make([]time.Time, 0, len(esAggBuckets)) + values := make([]*float64, 0, len(esAggBuckets)) + for k, v := range props { + tags[k] = v + } + + tags["field"] = metricField.(string) + tags["metric"] = "top_metrics" + + for _, v := range buckets { + bucket := simplejson.NewFromAny(v) + stats := bucket.GetPath(metric.ID, "top") + key := castToFloat(bucket.Get("key")) + + timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) + + for _, stat := range stats.MustArray() { + stat := stat.(map[string]interface{}) + + metrics, hasMetrics := stat["metrics"] + if hasMetrics { + metrics := metrics.(map[string]interface{}) + metricValue, hasMetricValue := metrics[metricField.(string)] + + if hasMetricValue && metricValue != nil { + v := metricValue.(float64) + values = append(values, &v) + } + } + } + } + + frames = append(frames, data.NewFrame(metricField.(string), + data.NewField("time", nil, timeVector), + data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metricField.(string)}), + )) + } case extendedStatsType: - buckets := esAgg.Get("buckets").MustArray() + buckets := esAggBuckets metaKeys := make([]string, 0) meta := metric.Meta.MustMap() @@ -245,111 +287,157 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, continue } - newSeries := plugins.DataTimeSeries{ - Tags: make(map[string]string), - } + tags := make(map[string]string, len(props)) + timeVector := make([]time.Time, 0, len(esAggBuckets)) + values := make([]*float64, 0, len(esAggBuckets)) + for k, v := range props { - newSeries.Tags[k] = v + tags[k] = v } - newSeries.Tags["metric"] = statName - newSeries.Tags["field"] = metric.Field + tags["metric"] = statName + tags["field"] = metric.Field for _, v := range buckets { bucket := simplejson.NewFromAny(v) - key := castToNullFloat(bucket.Get("key")) - var value null.Float + key := castToFloat(bucket.Get("key")) + var value *float64 switch statName { case "std_deviation_bounds_upper": - value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) + value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) case "std_deviation_bounds_lower": - value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) + value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) default: - value = castToNullFloat(bucket.GetPath(metric.ID, statName)) + value = castToFloat(bucket.GetPath(metric.ID, statName)) } - newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) + timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) + values = append(values, value) } - *series = append(*series, newSeries) + labels := tags + frames = append(frames, data.NewFrame(metric.Field, + data.NewField("time", nil, timeVector), + data.NewField("value", labels, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field}))) } default: - newSeries := plugins.DataTimeSeries{ - Tags: make(map[string]string), - } for k, v := range props { - newSeries.Tags[k] = v + tags[k] = v } - newSeries.Tags["metric"] = metric.Type - newSeries.Tags["field"] = metric.Field - newSeries.Tags["metricId"] = metric.ID - for _, v := range esAgg.Get("buckets").MustArray() { + tags["metric"] = metric.Type + tags["field"] = metric.Field + tags["metricId"] = metric.ID + for _, v := range esAggBuckets { bucket := simplejson.NewFromAny(v) - key := castToNullFloat(bucket.Get("key")) + key := castToFloat(bucket.Get("key")) valueObj, err := bucket.Get(metric.ID).Map() if err != nil { continue } - var value null.Float + var value *float64 if _, ok := valueObj["normalized_value"]; ok { - value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value")) + value = castToFloat(bucket.GetPath(metric.ID, "normalized_value")) } else { - value = castToNullFloat(bucket.GetPath(metric.ID, "value")) + value = castToFloat(bucket.GetPath(metric.ID, "value")) } - newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) + timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC()) + values = append(values, value) } - *series = append(*series, newSeries) + frames = append(frames, data.NewFrame(metric.Field, + data.NewField("time", nil, timeVector), + data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field}))) } } + if query.Dataframes != nil { + oldFrames, err := query.Dataframes.Decoded() + if err != nil { + return err + } + frames = append(oldFrames, frames...) + } + query.Dataframes = plugins.NewDecodedDataFrames(frames) return nil } // nolint:staticcheck // plugins.* deprecated func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, - table *plugins.DataTable, props map[string]string) error { + queryResult *plugins.DataQueryResult, props map[string]string) error { propKeys := make([]string, 0) for k := range props { propKeys = append(propKeys, k) } sort.Strings(propKeys) + frames := data.Frames{} + var fields []*data.Field - if len(table.Columns) == 0 { + if queryResult.Dataframes == nil { for _, propKey := range propKeys { - table.Columns = append(table.Columns, plugins.DataTableColumn{Text: propKey}) + fields = append(fields, data.NewField(propKey, nil, []*string{})) } - table.Columns = append(table.Columns, plugins.DataTableColumn{Text: aggDef.Field}) } - addMetricValue := func(values *plugins.DataRowValues, metricName string, value null.Float) { - found := false - for _, c := range table.Columns { - if c.Text == metricName { - found = true + addMetricValue := func(values []interface{}, metricName string, value *float64) { + index := -1 + for i, f := range fields { + if f.Name == metricName { + index = i break } } - if !found { - table.Columns = append(table.Columns, plugins.DataTableColumn{Text: metricName}) + + var field data.Field + if index == -1 { + field = *data.NewField(metricName, nil, []*float64{}) + fields = append(fields, &field) + } else { + field = *fields[index] } - *values = append(*values, value) + field.Append(value) } for _, v := range esAgg.Get("buckets").MustArray() { bucket := simplejson.NewFromAny(v) - values := make(plugins.DataRowValues, 0) + var values []interface{} - for _, propKey := range propKeys { - values = append(values, props[propKey]) + found := false + for _, e := range fields { + for _, propKey := range propKeys { + if e.Name == propKey { + e.Append(props[propKey]) + } + } + if e.Name == aggDef.Field { + found = true + if key, err := bucket.Get("key").String(); err == nil { + e.Append(&key) + } else { + f, err := bucket.Get("key").Float64() + if err != nil { + return err + } + e.Append(&f) + } + } } - if key, err := bucket.Get("key").String(); err == nil { - values = append(values, key) - } else { - values = append(values, castToNullFloat(bucket.Get("key"))) + if !found { + var aggDefField *data.Field + if key, err := bucket.Get("key").String(); err == nil { + aggDefField = extractDataField(aggDef.Field, &key) + aggDefField.Append(&key) + } else { + f, err := bucket.Get("key").Float64() + if err != nil { + return err + } + aggDefField = extractDataField(aggDef.Field, &f) + aggDefField.Append(&f) + } + fields = append(fields, aggDefField) } for _, metric := range target.Metrics { switch metric.Type { case countType: - addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count"))) + addMetricValue(values, rp.getMetricName(metric.Type), castToFloat(bucket.Get("doc_count"))) case extendedStatsType: metaKeys := make([]string, 0) meta := metric.Meta.MustMap() @@ -363,17 +451,17 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef continue } - var value null.Float + var value *float64 switch statName { case "std_deviation_bounds_upper": - value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) + value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) case "std_deviation_bounds_lower": - value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) + value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) default: - value = castToNullFloat(bucket.GetPath(metric.ID, statName)) + value = castToFloat(bucket.GetPath(metric.ID, statName)) } - addMetricValue(&values, rp.getMetricName(metric.Type), value) + addMetricValue(values, rp.getMetricName(metric.Type), value) break } default: @@ -394,17 +482,36 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef } } - addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value"))) + addMetricValue(values, metricName, castToFloat(bucket.GetPath(metric.ID, "value"))) } } - table.Rows = append(table.Rows, values) - } + var dataFields []*data.Field + dataFields = append(dataFields, fields...) + frames = data.Frames{ + &data.Frame{ + Fields: dataFields, + }} + } + queryResult.Dataframes = plugins.NewDecodedDataFrames(frames) return nil } -func (rp *responseParser) trimDatapoints(series plugins.DataTimeSeriesSlice, target *Query) { +func extractDataField(name string, v interface{}) *data.Field { + switch v.(type) { + case *string: + return data.NewField(name, nil, []*string{}) + case *float64: + return data.NewField(name, nil, []*float64{}) + default: + return &data.Field{} + } +} + +// TODO remove deprecations +// nolint:staticcheck // plugins.DataQueryResult deprecated +func (rp *responseParser) trimDatapoints(queryResult plugins.DataQueryResult, target *Query) { var histogram *BucketAgg for _, bucketAgg := range target.BucketAggs { if bucketAgg.Type == dateHistType { @@ -422,44 +529,62 @@ func (rp *responseParser) trimDatapoints(series plugins.DataTimeSeriesSlice, tar return } - for i := range series { - if len(series[i].Points) > trimEdges*2 { - series[i].Points = series[i].Points[trimEdges : len(series[i].Points)-trimEdges] + frames, err := queryResult.Dataframes.Decoded() + if err != nil { + return + } + + for _, frame := range frames { + for _, field := range frame.Fields { + if field.Len() > trimEdges*2 { + for i := 0; i < field.Len(); i++ { + if i < trimEdges || i > field.Len()-trimEdges { + field.Delete(i) + } + } + } } } } -func (rp *responseParser) nameSeries(seriesList plugins.DataTimeSeriesSlice, target *Query) { +// nolint:staticcheck // plugins.DataQueryResult deprecated +func (rp *responseParser) nameFields(queryResult plugins.DataQueryResult, target *Query) { set := make(map[string]struct{}) - for _, v := range seriesList { - if metricType, exists := v.Tags["metric"]; exists { - if _, ok := set[metricType]; !ok { - set[metricType] = struct{}{} + frames, err := queryResult.Dataframes.Decoded() + if err != nil { + return + } + for _, v := range frames { + for _, vv := range v.Fields { + if metricType, exists := vv.Labels["metric"]; exists { + if _, ok := set[metricType]; !ok { + set[metricType] = struct{}{} + } } } } metricTypeCount := len(set) - for i := range seriesList { - seriesList[i].Name = rp.getSeriesName(seriesList[i], target, metricTypeCount) + for i := range frames { + frames[i].Name = rp.getFieldName(*frames[i].Fields[1], target, metricTypeCount) } } var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`) // nolint:staticcheck // plugins.* deprecated -func (rp *responseParser) getSeriesName(series plugins.DataTimeSeries, target *Query, metricTypeCount int) string { - metricType := series.Tags["metric"] +func (rp *responseParser) getFieldName(dataField data.Field, target *Query, metricTypeCount int) string { + metricType := dataField.Labels["metric"] metricName := rp.getMetricName(metricType) - delete(series.Tags, "metric") + delete(dataField.Labels, "metric") field := "" - if v, ok := series.Tags["field"]; ok { + if v, ok := dataField.Labels["field"]; ok { field = v - delete(series.Tags, "field") + delete(dataField.Labels, "field") } if target.Alias != "" { - seriesName := target.Alias + frameName := target.Alias subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1) for _, subMatch := range subMatches { @@ -470,26 +595,26 @@ func (rp *responseParser) getSeriesName(series plugins.DataTimeSeries, target *Q } if strings.Index(group, "term ") == 0 { - seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1) + frameName = strings.Replace(frameName, subMatch[0], dataField.Labels[group[5:]], 1) } - if v, ok := series.Tags[group]; ok { - seriesName = strings.Replace(seriesName, subMatch[0], v, 1) + if v, ok := dataField.Labels[group]; ok { + frameName = strings.Replace(frameName, subMatch[0], v, 1) } if group == "metric" { - seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1) + frameName = strings.Replace(frameName, subMatch[0], metricName, 1) } if group == "field" { - seriesName = strings.Replace(seriesName, subMatch[0], field, 1) + frameName = strings.Replace(frameName, subMatch[0], field, 1) } } - return seriesName + return frameName } // todo, if field and pipelineAgg if field != "" && isPipelineAgg(metricType) { if isPipelineAggWithMultipleBucketPaths(metricType) { metricID := "" - if v, ok := series.Tags["metricId"]; ok { + if v, ok := dataField.Labels["metricId"]; ok { metricID = v } @@ -521,14 +646,14 @@ func (rp *responseParser) getSeriesName(series plugins.DataTimeSeries, target *Q metricName += " " + field } - delete(series.Tags, "metricId") + delete(dataField.Labels, "metricId") - if len(series.Tags) == 0 { + if len(dataField.Labels) == 0 { return metricName } name := "" - for _, v := range series.Tags { + for _, v := range dataField.Labels { name += v + " " } @@ -551,23 +676,23 @@ func (rp *responseParser) getMetricName(metric string) string { return metric } -func castToNullFloat(j *simplejson.Json) null.Float { +func castToFloat(j *simplejson.Json) *float64 { f, err := j.Float64() if err == nil { - return null.FloatFrom(f) + return &f } if s, err := j.String(); err == nil { if strings.ToLower(s) == "nan" { - return null.NewFloat(0, false) + return nil } if v, err := strconv.ParseFloat(s, 64); err == nil { - return null.FloatFromPtr(&v) + return &v } } - return null.NewFloat(0, false) + return nil } func findAgg(target *Query, aggID string) (*BucketAgg, error) { @@ -597,47 +722,3 @@ func getErrorFromElasticResponse(response *es.SearchResponse) plugins.DataQueryR return result } - -func processTopMetricValues(stats *simplejson.Json, field string) null.Float { - for _, stat := range stats.MustArray() { - stat := stat.(map[string]interface{}) - metrics, hasMetrics := stat["metrics"] - if hasMetrics { - metrics := metrics.(map[string]interface{}) - metricValue, hasMetricValue := metrics[field] - if hasMetricValue && metricValue != nil { - return null.FloatFrom(metricValue.(float64)) - } - } - } - return null.NewFloat(0, false) -} - -func processTopMetrics(metric *MetricAgg, esAgg *simplejson.Json, props map[string]string) plugins.DataTimeSeriesSlice { - var series plugins.DataTimeSeriesSlice - metrics, hasMetrics := metric.Settings.MustMap()["metrics"].([]interface{}) - - if hasMetrics { - for _, metricField := range metrics { - newSeries := plugins.DataTimeSeries{ - Tags: make(map[string]string), - } - - for _, v := range esAgg.Get("buckets").MustArray() { - bucket := simplejson.NewFromAny(v) - stats := bucket.GetPath(metric.ID, "top") - value := processTopMetricValues(stats, metricField.(string)) - key := castToNullFloat(bucket.Get("key")) - newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) - } - - for k, v := range props { - newSeries.Tags[k] = v - } - newSeries.Tags["metric"] = "top_metrics" - newSeries.Tags["field"] = metricField.(string) - series = append(series, newSeries) - } - } - return series -} diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index 5ee32ff8ff7..4c3cb137cf8 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -6,18 +6,16 @@ import ( "testing" "time" - "github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/plugins" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" "github.com/stretchr/testify/assert" - - . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/require" ) func TestResponseParser(t *testing.T) { - Convey("Elasticsearch response parser test", t, func() { - Convey("Simple query and count", func() { + t.Run("Elasticsearch response parser test", func(t *testing.T) { + t.Run("Simple query and count", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -46,24 +44,28 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 1) - series := queryRes.Series[0] - So(series.Name, ShouldEqual, "Count") - So(series.Points, ShouldHaveLength, 2) - So(series.Points[0][0].Float64, ShouldEqual, 10) - So(series.Points[0][1].Float64, ShouldEqual, 1000) - So(series.Points[1][0].Float64, ShouldEqual, 15) - So(series.Points[1][1].Float64, ShouldEqual, 2000) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 1) + + frame := dataframes[0] + require.Equal(t, frame.Name, "Count") + require.Len(t, frame.Fields, 2) + + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("Simple query count & avg aggregation", func() { + t.Run("Simple query count & avg aggregation", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -94,32 +96,37 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 2) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "Count") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 10) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 15) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 2) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "Average value") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 88) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 99) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "Count") + require.Len(t, frame.Fields, 2) + + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) + + frame = dataframes[1] + require.Equal(t, frame.Name, "Average value") + require.Len(t, frame.Fields, 2) + + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("Single group by query one metric", func() { + t.Run("Single group by query one metric", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -157,32 +164,34 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 2) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "server1") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 1) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 3) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 2) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "server2") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "server1") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) + + frame = dataframes[1] + require.Equal(t, frame.Name, "server2") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("Single group by query two metrics", func() { + t.Run("Single group by query two metrics", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -226,48 +235,51 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 4) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "server1 Count") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 1) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 3) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 4) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "server1 Average @value") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 10) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 12) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "server1 Count") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesThree := queryRes.Series[2] - So(seriesThree.Name, ShouldEqual, "server2 Count") - So(seriesThree.Points, ShouldHaveLength, 2) - So(seriesThree.Points[0][0].Float64, ShouldEqual, 1) - So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesThree.Points[1][0].Float64, ShouldEqual, 3) - So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000) + frame = dataframes[1] + require.Equal(t, frame.Name, "server1 Average @value") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesFour := queryRes.Series[3] - So(seriesFour.Name, ShouldEqual, "server2 Average @value") - So(seriesFour.Points, ShouldHaveLength, 2) - So(seriesFour.Points[0][0].Float64, ShouldEqual, 20) - So(seriesFour.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesFour.Points[1][0].Float64, ShouldEqual, 32) - So(seriesFour.Points[1][1].Float64, ShouldEqual, 2000) + frame = dataframes[2] + require.Equal(t, frame.Name, "server2 Count") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) + + frame = dataframes[3] + require.Equal(t, frame.Name, "server2 Average @value") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("With percentiles", func() { + t.Run("With percentiles", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -298,32 +310,35 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 2) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "p75") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 3.3) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 2.3) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 2) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "p90") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 5.5) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 4.5) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "p75") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) + + frame = dataframes[1] + require.Equal(t, frame.Name, "p90") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("With extended stats", func() { + t.Run("With extended stats", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -379,53 +394,67 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 6) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 6) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "server1 Max") - So(seriesOne.Points, ShouldHaveLength, 1) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 10.2) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) + frame := dataframes[0] + require.Equal(t, frame.Name, "server1 Max") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 1) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "server1 Std Dev Lower") - So(seriesTwo.Points, ShouldHaveLength, 1) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, -2) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) + frame = dataframes[1] + require.Equal(t, frame.Name, "server1 Std Dev Lower") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 1) - seriesThree := queryRes.Series[2] - So(seriesThree.Name, ShouldEqual, "server1 Std Dev Upper") - So(seriesThree.Points, ShouldHaveLength, 1) - So(seriesThree.Points[0][0].Float64, ShouldEqual, 3) - So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000) + frame = dataframes[2] + require.Equal(t, frame.Name, "server1 Std Dev Upper") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 1) - seriesFour := queryRes.Series[3] - So(seriesFour.Name, ShouldEqual, "server2 Max") - So(seriesFour.Points, ShouldHaveLength, 1) - So(seriesFour.Points[0][0].Float64, ShouldEqual, 15.5) - So(seriesFour.Points[0][1].Float64, ShouldEqual, 1000) + frame = dataframes[3] + require.Equal(t, frame.Name, "server2 Max") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 1) - seriesFive := queryRes.Series[4] - So(seriesFive.Name, ShouldEqual, "server2 Std Dev Lower") - So(seriesFive.Points, ShouldHaveLength, 1) - So(seriesFive.Points[0][0].Float64, ShouldEqual, -1) - So(seriesFive.Points[0][1].Float64, ShouldEqual, 1000) + frame = dataframes[4] + require.Equal(t, frame.Name, "server2 Std Dev Lower") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 1) - seriesSix := queryRes.Series[5] - So(seriesSix.Name, ShouldEqual, "server2 Std Dev Upper") - So(seriesSix.Points, ShouldHaveLength, 1) - So(seriesSix.Points[0][0].Float64, ShouldEqual, 4) - So(seriesSix.Points[0][1].Float64, ShouldEqual, 1000) + frame = dataframes[5] + require.Equal(t, frame.Name, "server2 Std Dev Upper") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 1) }) - Convey("Single group by with alias pattern", func() { + t.Run("Single group by with alias pattern", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -471,86 +500,75 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 3) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 3) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "server1 Count and {{not_exist}} server1") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 1) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 3) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "server1 Count and {{not_exist}} server1") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "server2 Count and {{not_exist}} server2") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame = dataframes[1] + require.Equal(t, frame.Name, "server2 Count and {{not_exist}} server2") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesThree := queryRes.Series[2] - So(seriesThree.Name, ShouldEqual, "0 Count and {{not_exist}} 0") - So(seriesThree.Points, ShouldHaveLength, 2) - So(seriesThree.Points[0][0].Float64, ShouldEqual, 2) - So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesThree.Points[1][0].Float64, ShouldEqual, 8) - So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000) + frame = dataframes[2] + require.Equal(t, frame.Name, "0 Count and {{not_exist}} 0") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("Histogram response", func() { + t.Run("Histogram response", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", "metrics": [{ "type": "count", "id": "1" }], - "bucketAggs": [{ "type": "histogram", "field": "bytes", "id": "3" }] + "bucketAggs": [{ "type": "histogram", "field": "bytes", "id": "3" }] }`, } response := `{ "responses": [ - { - "aggregations": { - "3": { - "buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }, { "doc_count": 2, "key": 3000 }] - } - } - } + { + "aggregations": { + "3": { + "buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }, { "doc_count": 2, "key": 3000 }] + } + } + } ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Tables, ShouldHaveLength, 1) - - rows := queryRes.Tables[0].Rows - So(rows, ShouldHaveLength, 3) - cols := queryRes.Tables[0].Columns - So(cols, ShouldHaveLength, 2) - - So(cols[0].Text, ShouldEqual, "bytes") - So(cols[1].Text, ShouldEqual, "Count") - - So(rows[0][0].(null.Float).Float64, ShouldEqual, 1000) - So(rows[0][1].(null.Float).Float64, ShouldEqual, 1) - So(rows[1][0].(null.Float).Float64, ShouldEqual, 2000) - So(rows[1][1].(null.Float).Float64, ShouldEqual, 3) - So(rows[2][0].(null.Float).Float64, ShouldEqual, 3000) - So(rows[2][1].(null.Float).Float64, ShouldEqual, 2) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 1) }) - Convey("With two filters agg", func() { + t.Run("With two filters agg", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -590,33 +608,35 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 2) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 2) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "@metric:cpu") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 1) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 3) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "@metric:cpu") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "@metric:logins.count") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 2) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 8) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame = dataframes[1] + require.Equal(t, frame.Name, "@metric:logins.count") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("With dropfirst and last aggregation", func() { + t.Run("With dropfirst and last aggregation", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -659,86 +679,88 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 2) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 2) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "Average") - So(seriesOne.Points, ShouldHaveLength, 1) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 2000) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 2) + frame := dataframes[0] + require.Equal(t, frame.Name, "Average") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "Count") - So(seriesTwo.Points, ShouldHaveLength, 1) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 200) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 2) + frame = dataframes[1] + require.Equal(t, frame.Name, "Count") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("No group by time", func() { + t.Run("No group by time", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", "metrics": [{ "type": "avg", "id": "1" }, { "type": "count" }], - "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }] + "bucketAggs": [{ "type": "terms", "field": "host", "id": "2" }] }`, } response := `{ "responses": [ - { - "aggregations": { - "2": { - "buckets": [ - { - "1": { "value": 1000 }, - "key": "server-1", - "doc_count": 369 - }, - { - "1": { "value": 2000 }, - "key": "server-2", - "doc_count": 200 - } - ] - } - } - } + { + "aggregations": { + "2": { + "buckets": [ + { + "1": { "value": 1000 }, + "key": "server-1", + "doc_count": 369 + }, + { + "1": { "value": 2000 }, + "key": "server-2", + "doc_count": 200 + } + ] + } + } + } ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Tables, ShouldHaveLength, 1) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 1) - rows := queryRes.Tables[0].Rows - So(rows, ShouldHaveLength, 2) - cols := queryRes.Tables[0].Columns - So(cols, ShouldHaveLength, 3) - - So(cols[0].Text, ShouldEqual, "host") - So(cols[1].Text, ShouldEqual, "Average") - So(cols[2].Text, ShouldEqual, "Count") - - So(rows[0][0].(string), ShouldEqual, "server-1") - So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000) - So(rows[0][2].(null.Float).Float64, ShouldEqual, 369) - So(rows[1][0].(string), ShouldEqual, "server-2") - So(rows[1][1].(null.Float).Float64, ShouldEqual, 2000) - So(rows[1][2].(null.Float).Float64, ShouldEqual, 200) + frame := dataframes[0] + require.Equal(t, frame.Name, "") + require.Len(t, frame.Fields, 3) + require.Equal(t, frame.Fields[0].Name, "host") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "Average") + require.Equal(t, frame.Fields[1].Len(), 2) + require.Equal(t, frame.Fields[2].Name, "Count") + require.Equal(t, frame.Fields[2].Len(), 2) }) - Convey("Multiple metrics of same type", func() { + t.Run("Multiple metrics of same type", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -765,30 +787,29 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Tables, ShouldHaveLength, 1) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 1) - rows := queryRes.Tables[0].Rows - So(rows, ShouldHaveLength, 1) - cols := queryRes.Tables[0].Columns - So(cols, ShouldHaveLength, 3) - - So(cols[0].Text, ShouldEqual, "host") - So(cols[1].Text, ShouldEqual, "Average test") - So(cols[2].Text, ShouldEqual, "Average test2") - - So(rows[0][0].(string), ShouldEqual, "server-1") - So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000) - So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000) + frame := dataframes[0] + require.Equal(t, frame.Name, "") + require.Len(t, frame.Fields, 3) + require.Equal(t, frame.Fields[0].Name, "host") + require.Equal(t, frame.Fields[0].Len(), 1) + require.Equal(t, frame.Fields[1].Name, "Average test") + require.Equal(t, frame.Fields[1].Len(), 1) + require.Equal(t, frame.Fields[2].Name, "Average test2") + require.Equal(t, frame.Fields[2].Len(), 1) }) - Convey("With bucket_script", func() { + t.Run("With bucket_script", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -833,40 +854,43 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Series, ShouldHaveLength, 3) - seriesOne := queryRes.Series[0] - So(seriesOne.Name, ShouldEqual, "Sum @value") - So(seriesOne.Points, ShouldHaveLength, 2) - So(seriesOne.Points[0][0].Float64, ShouldEqual, 2) - So(seriesOne.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesOne.Points[1][0].Float64, ShouldEqual, 3) - So(seriesOne.Points[1][1].Float64, ShouldEqual, 2000) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 3) - seriesTwo := queryRes.Series[1] - So(seriesTwo.Name, ShouldEqual, "Max @value") - So(seriesTwo.Points, ShouldHaveLength, 2) - So(seriesTwo.Points[0][0].Float64, ShouldEqual, 3) - So(seriesTwo.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesTwo.Points[1][0].Float64, ShouldEqual, 4) - So(seriesTwo.Points[1][1].Float64, ShouldEqual, 2000) + frame := dataframes[0] + require.Equal(t, frame.Name, "Sum @value") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) - seriesThree := queryRes.Series[2] - So(seriesThree.Name, ShouldEqual, "Sum @value * Max @value") - So(seriesThree.Points, ShouldHaveLength, 2) - So(seriesThree.Points[0][0].Float64, ShouldEqual, 6) - So(seriesThree.Points[0][1].Float64, ShouldEqual, 1000) - So(seriesThree.Points[1][0].Float64, ShouldEqual, 12) - So(seriesThree.Points[1][1].Float64, ShouldEqual, 2000) + frame = dataframes[1] + require.Equal(t, frame.Name, "Max @value") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) + + frame = dataframes[2] + require.Equal(t, frame.Name, "Sum @value * Max @value") + require.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Name, "time") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "value") + require.Equal(t, frame.Fields[1].Len(), 2) }) - Convey("Terms with two bucket_script", func() { + t.Run("Terms with two bucket_script", func(t *testing.T) { targets := map[string]string{ "A": `{ "timeField": "@timestamp", @@ -920,78 +944,31 @@ func TestResponseParser(t *testing.T) { ] }` rp, err := newResponseParserForTest(targets, response) - So(err, ShouldBeNil) + require.NoError(t, err) result, err := rp.getTimeSeries() - So(err, ShouldBeNil) - So(result.Results, ShouldHaveLength, 1) + require.NoError(t, err) + require.Len(t, result.Results, 1) + queryRes := result.Results["A"] - So(queryRes, ShouldNotBeNil) - So(queryRes.Tables[0].Rows, ShouldHaveLength, 2) - So(queryRes.Tables[0].Columns[1].Text, ShouldEqual, "Sum") - So(queryRes.Tables[0].Columns[2].Text, ShouldEqual, "Max") - So(queryRes.Tables[0].Columns[3].Text, ShouldEqual, "params.var1 * params.var2") - So(queryRes.Tables[0].Columns[4].Text, ShouldEqual, "params.var1 * params.var2 * 2") - So(queryRes.Tables[0].Rows[0][1].(null.Float).Float64, ShouldEqual, 2) - So(queryRes.Tables[0].Rows[0][2].(null.Float).Float64, ShouldEqual, 3) - So(queryRes.Tables[0].Rows[0][3].(null.Float).Float64, ShouldEqual, 6) - So(queryRes.Tables[0].Rows[0][4].(null.Float).Float64, ShouldEqual, 24) - So(queryRes.Tables[0].Rows[1][1].(null.Float).Float64, ShouldEqual, 3) - So(queryRes.Tables[0].Rows[1][2].(null.Float).Float64, ShouldEqual, 4) - So(queryRes.Tables[0].Rows[1][3].(null.Float).Float64, ShouldEqual, 12) - So(queryRes.Tables[0].Rows[1][4].(null.Float).Float64, ShouldEqual, 48) + require.NotNil(t, queryRes) + dataframes, err := queryRes.Dataframes.Decoded() + require.NoError(t, err) + require.Len(t, dataframes, 1) + + frame := dataframes[0] + require.Equal(t, frame.Name, "") + require.Len(t, frame.Fields, 5) + require.Equal(t, frame.Fields[0].Name, "@timestamp") + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Name, "Sum") + require.Equal(t, frame.Fields[1].Len(), 2) + require.Equal(t, frame.Fields[2].Name, "Max") + require.Equal(t, frame.Fields[2].Len(), 2) + require.Equal(t, frame.Fields[3].Name, "params.var1 * params.var2") + require.Equal(t, frame.Fields[3].Len(), 2) + require.Equal(t, frame.Fields[4].Name, "params.var1 * params.var2 * 2") + require.Equal(t, frame.Fields[4].Len(), 2) }) - // Convey("Raw documents query", func() { - // targets := map[string]string{ - // "A": `{ - // "timeField": "@timestamp", - // "metrics": [{ "type": "raw_document", "id": "1" }] - // }`, - // } - // response := `{ - // "responses": [ - // { - // "hits": { - // "total": 100, - // "hits": [ - // { - // "_id": "1", - // "_type": "type", - // "_index": "index", - // "_source": { "sourceProp": "asd" }, - // "fields": { "fieldProp": "field" } - // }, - // { - // "_source": { "sourceProp": "asd2" }, - // "fields": { "fieldProp": "field2" } - // } - // ] - // } - // } - // ] - // }` - // rp, err := newResponseParserForTest(targets, response) - // So(err, ShouldBeNil) - // result, err := rp.getTimeSeries() - // So(err, ShouldBeNil) - // So(result.Results, ShouldHaveLength, 1) - - // queryRes := result.Results["A"] - // So(queryRes, ShouldNotBeNil) - // So(queryRes.Tables, ShouldHaveLength, 1) - - // rows := queryRes.Tables[0].Rows - // So(rows, ShouldHaveLength, 1) - // cols := queryRes.Tables[0].Columns - // So(cols, ShouldHaveLength, 3) - - // So(cols[0].Text, ShouldEqual, "host") - // So(cols[1].Text, ShouldEqual, "Average test") - // So(cols[2].Text, ShouldEqual, "Average test2") - - // So(rows[0][0].(string), ShouldEqual, "server-1") - // So(rows[0][1].(null.Float).Float64, ShouldEqual, 1000) - // So(rows[0][2].(null.Float).Float64, ShouldEqual, 3000) - // }) }) t.Run("With top_metrics", func(t *testing.T) { @@ -1035,7 +1012,7 @@ func TestResponseParser(t *testing.T) { ] } } - ] + ] } } }] @@ -1048,24 +1025,41 @@ func TestResponseParser(t *testing.T) { queryRes := result.Results["A"] assert.NotNil(t, queryRes) - assert.Len(t, queryRes.Series, 2) + dataframes, err := queryRes.Dataframes.Decoded() + assert.NoError(t, err) + assert.Len(t, dataframes, 2) - seriesOne := queryRes.Series[0] - assert.Equal(t, seriesOne.Name, "Top Metrics @value") - assert.Len(t, seriesOne.Points, 2) - assert.Equal(t, seriesOne.Points[0][0].Float64, 1.) - assert.Equal(t, seriesOne.Points[0][1].Float64, 1609459200000.) - assert.Equal(t, seriesOne.Points[1][0].Float64, 1.) - assert.Equal(t, seriesOne.Points[1][1].Float64, 1609459210000.) + frame := dataframes[0] + assert.Equal(t, frame.Name, "Top Metrics @value") + assert.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Len(), 2) + v, _ := frame.FloatAt(0, 0) + assert.Equal(t, 1609459200000., v) + v, _ = frame.FloatAt(1, 0) + assert.Equal(t, 1., v) - seriesTwo := queryRes.Series[1] - assert.Equal(t, seriesTwo.Name, "Top Metrics @anotherValue") - assert.Len(t, seriesTwo.Points, 2) + v, _ = frame.FloatAt(0, 1) + assert.Equal(t, 1609459210000., v) + v, _ = frame.FloatAt(1, 1) + assert.Equal(t, 1., v) - assert.Equal(t, seriesTwo.Points[0][0].Float64, 2.) - assert.Equal(t, seriesTwo.Points[0][1].Float64, 1609459200000.) - assert.Equal(t, seriesTwo.Points[1][0].Float64, 2.) - assert.Equal(t, seriesTwo.Points[1][1].Float64, 1609459210000.) + frame = dataframes[1] + assert.Equal(t, frame.Name, "Top Metrics @anotherValue") + l, _ := frame.MarshalJSON() + fmt.Println(string(l)) + assert.Len(t, frame.Fields, 2) + require.Equal(t, frame.Fields[0].Len(), 2) + require.Equal(t, frame.Fields[1].Len(), 2) + v, _ = frame.FloatAt(0, 0) + assert.Equal(t, 1609459200000., v) + v, _ = frame.FloatAt(1, 0) + assert.Equal(t, 2., v) + + v, _ = frame.FloatAt(0, 1) + assert.Equal(t, 1609459210000., v) + v, _ = frame.FloatAt(1, 1) + assert.Equal(t, 2., v) }) }