Elasticsearch: Convert Timeseries and Tables to Dataframes (#34710)

* Rebase (broken tests)

* Removed tables - refactored processAggregationDocs func

* Tests cleanup

* Nit - add space in percentile legend titles

* Fix labels naming - use metricAggType map

* Fix bug which appended same fields over and over again

* Replace test with dataframes

* Fix tests

* Add nolint:gocyclo - will need refactoring

* Move frames tags assignment

* Add failing test fo when ES server is booting up

* Revert "Add failing test fo when ES server is booting up"

This reverts commit fd14a1fd5e.

Co-authored-by: Elfo404 <me@giordanoricci.com>
This commit is contained in:
Dimitris Sotirakis 2021-06-18 13:26:19 +03:00 committed by GitHub
parent 7aaf813751
commit 0d08881876
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 661 additions and 586 deletions

View File

@ -6,8 +6,9 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
@ -69,20 +70,12 @@ func (rp *responseParser) getTimeSeries() (plugins.DataResponse, error) {
Meta: debugInfo, Meta: debugInfo,
} }
props := make(map[string]string) props := make(map[string]string)
table := plugins.DataTable{ err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
Columns: make([]plugins.DataTableColumn, 0),
Rows: make([]plugins.DataRowValues, 0),
}
err := rp.processBuckets(res.Aggregations, target, &queryRes.Series, &table, props, 0)
if err != nil { if err != nil {
return plugins.DataResponse{}, err return plugins.DataResponse{}, err
} }
rp.nameSeries(queryRes.Series, target) rp.nameFields(queryRes, target)
rp.trimDatapoints(queryRes.Series, target) rp.trimDatapoints(queryRes, target)
if len(table.Rows) > 0 {
queryRes.Tables = append(queryRes.Tables, table)
}
result.Results[target.RefID] = queryRes result.Results[target.RefID] = queryRes
} }
@ -91,7 +84,7 @@ func (rp *responseParser) getTimeSeries() (plugins.DataResponse, error) {
// nolint:staticcheck // plugins.* deprecated // nolint:staticcheck // plugins.* deprecated
func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query,
series *plugins.DataTimeSeriesSlice, table *plugins.DataTable, props map[string]string, depth int) error { queryResult *plugins.DataQueryResult, props map[string]string, depth int) error {
var err error var err error
maxDepth := len(target.BucketAggs) - 1 maxDepth := len(target.BucketAggs) - 1
@ -110,9 +103,9 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
if depth == maxDepth { if depth == maxDepth {
if aggDef.Type == dateHistType { if aggDef.Type == dateHistType {
err = rp.processMetrics(esAgg, target, series, props) err = rp.processMetrics(esAgg, target, queryResult, props)
} else { } else {
err = rp.processAggregationDocs(esAgg, aggDef, target, table, props) err = rp.processAggregationDocs(esAgg, aggDef, target, queryResult, props)
} }
if err != nil { if err != nil {
return err return err
@ -135,7 +128,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
if key, err := bucket.Get("key_as_string").String(); err == nil { if key, err := bucket.Get("key_as_string").String(); err == nil {
newProps[aggDef.Field] = key newProps[aggDef.Field] = key
} }
err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1) err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
if err != nil { if err != nil {
return err return err
} }
@ -158,7 +151,7 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
newProps["filter"] = bucketKey newProps["filter"] = bucketKey
err = rp.processBuckets(bucket.MustMap(), target, series, table, newProps, depth+1) err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
if err != nil { if err != nil {
return err return err
} }
@ -168,35 +161,40 @@ func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Qu
return nil return nil
} }
// nolint:staticcheck // plugins.* deprecated // nolint:staticcheck,gocyclo // plugins.* deprecated
func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *plugins.DataTimeSeriesSlice, func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, query *plugins.DataQueryResult,
props map[string]string) error { props map[string]string) error {
frames := data.Frames{}
esAggBuckets := esAgg.Get("buckets").MustArray()
for _, metric := range target.Metrics { for _, metric := range target.Metrics {
if metric.Hide { if metric.Hide {
continue continue
} }
tags := make(map[string]string, len(props))
timeVector := make([]time.Time, 0, len(esAggBuckets))
values := make([]*float64, 0, len(esAggBuckets))
switch metric.Type { switch metric.Type {
case countType: case countType:
newSeries := plugins.DataTimeSeries{ for _, v := range esAggBuckets {
Tags: make(map[string]string),
}
for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v) bucket := simplejson.NewFromAny(v)
value := castToNullFloat(bucket.Get("doc_count")) value := castToFloat(bucket.Get("doc_count"))
key := castToNullFloat(bucket.Get("key")) key := castToFloat(bucket.Get("key"))
newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
values = append(values, value)
} }
for k, v := range props { for k, v := range props {
newSeries.Tags[k] = v tags[k] = v
} }
newSeries.Tags["metric"] = countType tags["metric"] = countType
*series = append(*series, newSeries) frames = append(frames, data.NewFrame(metric.Field,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field})))
case percentilesType: case percentilesType:
buckets := esAgg.Get("buckets").MustArray() buckets := esAggBuckets
if len(buckets) == 0 { if len(buckets) == 0 {
break break
} }
@ -210,28 +208,72 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query,
} }
sort.Strings(percentileKeys) sort.Strings(percentileKeys)
for _, percentileName := range percentileKeys { for _, percentileName := range percentileKeys {
newSeries := plugins.DataTimeSeries{ tags := make(map[string]string, len(props))
Tags: make(map[string]string), timeVector := make([]time.Time, 0, len(esAggBuckets))
} values := make([]*float64, 0, len(esAggBuckets))
for k, v := range props { for k, v := range props {
newSeries.Tags[k] = v tags[k] = v
} }
newSeries.Tags["metric"] = "p" + percentileName tags["metric"] = "p" + percentileName
newSeries.Tags["field"] = metric.Field tags["field"] = metric.Field
for _, v := range buckets { for _, v := range buckets {
bucket := simplejson.NewFromAny(v) bucket := simplejson.NewFromAny(v)
value := castToNullFloat(bucket.GetPath(metric.ID, "values", percentileName)) value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName))
key := castToNullFloat(bucket.Get("key")) key := castToFloat(bucket.Get("key"))
newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
values = append(values, value)
} }
*series = append(*series, newSeries) frames = append(frames, data.NewFrame(metric.Field,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field})))
} }
case topMetricsType: case topMetricsType:
topMetricSeries := processTopMetrics(metric, esAgg, props) buckets := esAggBuckets
*series = append(*series, topMetricSeries...) metrics := metric.Settings.Get("metrics").MustArray()
for _, metricField := range metrics {
tags := make(map[string]string, len(props))
timeVector := make([]time.Time, 0, len(esAggBuckets))
values := make([]*float64, 0, len(esAggBuckets))
for k, v := range props {
tags[k] = v
}
tags["field"] = metricField.(string)
tags["metric"] = "top_metrics"
for _, v := range buckets {
bucket := simplejson.NewFromAny(v)
stats := bucket.GetPath(metric.ID, "top")
key := castToFloat(bucket.Get("key"))
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
for _, stat := range stats.MustArray() {
stat := stat.(map[string]interface{})
metrics, hasMetrics := stat["metrics"]
if hasMetrics {
metrics := metrics.(map[string]interface{})
metricValue, hasMetricValue := metrics[metricField.(string)]
if hasMetricValue && metricValue != nil {
v := metricValue.(float64)
values = append(values, &v)
}
}
}
}
frames = append(frames, data.NewFrame(metricField.(string),
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metricField.(string)}),
))
}
case extendedStatsType: case extendedStatsType:
buckets := esAgg.Get("buckets").MustArray() buckets := esAggBuckets
metaKeys := make([]string, 0) metaKeys := make([]string, 0)
meta := metric.Meta.MustMap() meta := metric.Meta.MustMap()
@ -245,111 +287,157 @@ func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query,
continue continue
} }
newSeries := plugins.DataTimeSeries{ tags := make(map[string]string, len(props))
Tags: make(map[string]string), timeVector := make([]time.Time, 0, len(esAggBuckets))
} values := make([]*float64, 0, len(esAggBuckets))
for k, v := range props { for k, v := range props {
newSeries.Tags[k] = v tags[k] = v
} }
newSeries.Tags["metric"] = statName tags["metric"] = statName
newSeries.Tags["field"] = metric.Field tags["field"] = metric.Field
for _, v := range buckets { for _, v := range buckets {
bucket := simplejson.NewFromAny(v) bucket := simplejson.NewFromAny(v)
key := castToNullFloat(bucket.Get("key")) key := castToFloat(bucket.Get("key"))
var value null.Float var value *float64
switch statName { switch statName {
case "std_deviation_bounds_upper": case "std_deviation_bounds_upper":
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
case "std_deviation_bounds_lower": case "std_deviation_bounds_lower":
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
default: default:
value = castToNullFloat(bucket.GetPath(metric.ID, statName)) value = castToFloat(bucket.GetPath(metric.ID, statName))
} }
newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
values = append(values, value)
} }
*series = append(*series, newSeries) labels := tags
frames = append(frames, data.NewFrame(metric.Field,
data.NewField("time", nil, timeVector),
data.NewField("value", labels, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field})))
} }
default: default:
newSeries := plugins.DataTimeSeries{
Tags: make(map[string]string),
}
for k, v := range props { for k, v := range props {
newSeries.Tags[k] = v tags[k] = v
} }
newSeries.Tags["metric"] = metric.Type tags["metric"] = metric.Type
newSeries.Tags["field"] = metric.Field tags["field"] = metric.Field
newSeries.Tags["metricId"] = metric.ID tags["metricId"] = metric.ID
for _, v := range esAgg.Get("buckets").MustArray() { for _, v := range esAggBuckets {
bucket := simplejson.NewFromAny(v) bucket := simplejson.NewFromAny(v)
key := castToNullFloat(bucket.Get("key")) key := castToFloat(bucket.Get("key"))
valueObj, err := bucket.Get(metric.ID).Map() valueObj, err := bucket.Get(metric.ID).Map()
if err != nil { if err != nil {
continue continue
} }
var value null.Float var value *float64
if _, ok := valueObj["normalized_value"]; ok { if _, ok := valueObj["normalized_value"]; ok {
value = castToNullFloat(bucket.GetPath(metric.ID, "normalized_value")) value = castToFloat(bucket.GetPath(metric.ID, "normalized_value"))
} else { } else {
value = castToNullFloat(bucket.GetPath(metric.ID, "value")) value = castToFloat(bucket.GetPath(metric.ID, "value"))
} }
newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key}) timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
values = append(values, value)
} }
*series = append(*series, newSeries) frames = append(frames, data.NewFrame(metric.Field,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: rp.getMetricName(tags["metric"]) + " " + metric.Field})))
} }
} }
if query.Dataframes != nil {
oldFrames, err := query.Dataframes.Decoded()
if err != nil {
return err
}
frames = append(oldFrames, frames...)
}
query.Dataframes = plugins.NewDecodedDataFrames(frames)
return nil return nil
} }
// nolint:staticcheck // plugins.* deprecated // nolint:staticcheck // plugins.* deprecated
func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query, func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query,
table *plugins.DataTable, props map[string]string) error { queryResult *plugins.DataQueryResult, props map[string]string) error {
propKeys := make([]string, 0) propKeys := make([]string, 0)
for k := range props { for k := range props {
propKeys = append(propKeys, k) propKeys = append(propKeys, k)
} }
sort.Strings(propKeys) sort.Strings(propKeys)
frames := data.Frames{}
var fields []*data.Field
if len(table.Columns) == 0 { if queryResult.Dataframes == nil {
for _, propKey := range propKeys { for _, propKey := range propKeys {
table.Columns = append(table.Columns, plugins.DataTableColumn{Text: propKey}) fields = append(fields, data.NewField(propKey, nil, []*string{}))
} }
table.Columns = append(table.Columns, plugins.DataTableColumn{Text: aggDef.Field})
} }
addMetricValue := func(values *plugins.DataRowValues, metricName string, value null.Float) { addMetricValue := func(values []interface{}, metricName string, value *float64) {
found := false index := -1
for _, c := range table.Columns { for i, f := range fields {
if c.Text == metricName { if f.Name == metricName {
found = true index = i
break break
} }
} }
if !found {
table.Columns = append(table.Columns, plugins.DataTableColumn{Text: metricName}) var field data.Field
if index == -1 {
field = *data.NewField(metricName, nil, []*float64{})
fields = append(fields, &field)
} else {
field = *fields[index]
} }
*values = append(*values, value) field.Append(value)
} }
for _, v := range esAgg.Get("buckets").MustArray() { for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v) bucket := simplejson.NewFromAny(v)
values := make(plugins.DataRowValues, 0) var values []interface{}
for _, propKey := range propKeys { found := false
values = append(values, props[propKey]) for _, e := range fields {
for _, propKey := range propKeys {
if e.Name == propKey {
e.Append(props[propKey])
}
}
if e.Name == aggDef.Field {
found = true
if key, err := bucket.Get("key").String(); err == nil {
e.Append(&key)
} else {
f, err := bucket.Get("key").Float64()
if err != nil {
return err
}
e.Append(&f)
}
}
} }
if key, err := bucket.Get("key").String(); err == nil { if !found {
values = append(values, key) var aggDefField *data.Field
} else { if key, err := bucket.Get("key").String(); err == nil {
values = append(values, castToNullFloat(bucket.Get("key"))) aggDefField = extractDataField(aggDef.Field, &key)
aggDefField.Append(&key)
} else {
f, err := bucket.Get("key").Float64()
if err != nil {
return err
}
aggDefField = extractDataField(aggDef.Field, &f)
aggDefField.Append(&f)
}
fields = append(fields, aggDefField)
} }
for _, metric := range target.Metrics { for _, metric := range target.Metrics {
switch metric.Type { switch metric.Type {
case countType: case countType:
addMetricValue(&values, rp.getMetricName(metric.Type), castToNullFloat(bucket.Get("doc_count"))) addMetricValue(values, rp.getMetricName(metric.Type), castToFloat(bucket.Get("doc_count")))
case extendedStatsType: case extendedStatsType:
metaKeys := make([]string, 0) metaKeys := make([]string, 0)
meta := metric.Meta.MustMap() meta := metric.Meta.MustMap()
@ -363,17 +451,17 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef
continue continue
} }
var value null.Float var value *float64
switch statName { switch statName {
case "std_deviation_bounds_upper": case "std_deviation_bounds_upper":
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper")) value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
case "std_deviation_bounds_lower": case "std_deviation_bounds_lower":
value = castToNullFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower")) value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
default: default:
value = castToNullFloat(bucket.GetPath(metric.ID, statName)) value = castToFloat(bucket.GetPath(metric.ID, statName))
} }
addMetricValue(&values, rp.getMetricName(metric.Type), value) addMetricValue(values, rp.getMetricName(metric.Type), value)
break break
} }
default: default:
@ -394,17 +482,36 @@ func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef
} }
} }
addMetricValue(&values, metricName, castToNullFloat(bucket.GetPath(metric.ID, "value"))) addMetricValue(values, metricName, castToFloat(bucket.GetPath(metric.ID, "value")))
} }
} }
table.Rows = append(table.Rows, values) var dataFields []*data.Field
} dataFields = append(dataFields, fields...)
frames = data.Frames{
&data.Frame{
Fields: dataFields,
}}
}
queryResult.Dataframes = plugins.NewDecodedDataFrames(frames)
return nil return nil
} }
func (rp *responseParser) trimDatapoints(series plugins.DataTimeSeriesSlice, target *Query) { func extractDataField(name string, v interface{}) *data.Field {
switch v.(type) {
case *string:
return data.NewField(name, nil, []*string{})
case *float64:
return data.NewField(name, nil, []*float64{})
default:
return &data.Field{}
}
}
// TODO remove deprecations
// nolint:staticcheck // plugins.DataQueryResult deprecated
func (rp *responseParser) trimDatapoints(queryResult plugins.DataQueryResult, target *Query) {
var histogram *BucketAgg var histogram *BucketAgg
for _, bucketAgg := range target.BucketAggs { for _, bucketAgg := range target.BucketAggs {
if bucketAgg.Type == dateHistType { if bucketAgg.Type == dateHistType {
@ -422,44 +529,62 @@ func (rp *responseParser) trimDatapoints(series plugins.DataTimeSeriesSlice, tar
return return
} }
for i := range series { frames, err := queryResult.Dataframes.Decoded()
if len(series[i].Points) > trimEdges*2 { if err != nil {
series[i].Points = series[i].Points[trimEdges : len(series[i].Points)-trimEdges] return
}
for _, frame := range frames {
for _, field := range frame.Fields {
if field.Len() > trimEdges*2 {
for i := 0; i < field.Len(); i++ {
if i < trimEdges || i > field.Len()-trimEdges {
field.Delete(i)
}
}
}
} }
} }
} }
func (rp *responseParser) nameSeries(seriesList plugins.DataTimeSeriesSlice, target *Query) { // nolint:staticcheck // plugins.DataQueryResult deprecated
func (rp *responseParser) nameFields(queryResult plugins.DataQueryResult, target *Query) {
set := make(map[string]struct{}) set := make(map[string]struct{})
for _, v := range seriesList { frames, err := queryResult.Dataframes.Decoded()
if metricType, exists := v.Tags["metric"]; exists { if err != nil {
if _, ok := set[metricType]; !ok { return
set[metricType] = struct{}{} }
for _, v := range frames {
for _, vv := range v.Fields {
if metricType, exists := vv.Labels["metric"]; exists {
if _, ok := set[metricType]; !ok {
set[metricType] = struct{}{}
}
} }
} }
} }
metricTypeCount := len(set) metricTypeCount := len(set)
for i := range seriesList { for i := range frames {
seriesList[i].Name = rp.getSeriesName(seriesList[i], target, metricTypeCount) frames[i].Name = rp.getFieldName(*frames[i].Fields[1], target, metricTypeCount)
} }
} }
var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`) var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
// nolint:staticcheck // plugins.* deprecated // nolint:staticcheck // plugins.* deprecated
func (rp *responseParser) getSeriesName(series plugins.DataTimeSeries, target *Query, metricTypeCount int) string { func (rp *responseParser) getFieldName(dataField data.Field, target *Query, metricTypeCount int) string {
metricType := series.Tags["metric"] metricType := dataField.Labels["metric"]
metricName := rp.getMetricName(metricType) metricName := rp.getMetricName(metricType)
delete(series.Tags, "metric") delete(dataField.Labels, "metric")
field := "" field := ""
if v, ok := series.Tags["field"]; ok { if v, ok := dataField.Labels["field"]; ok {
field = v field = v
delete(series.Tags, "field") delete(dataField.Labels, "field")
} }
if target.Alias != "" { if target.Alias != "" {
seriesName := target.Alias frameName := target.Alias
subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1) subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
for _, subMatch := range subMatches { for _, subMatch := range subMatches {
@ -470,26 +595,26 @@ func (rp *responseParser) getSeriesName(series plugins.DataTimeSeries, target *Q
} }
if strings.Index(group, "term ") == 0 { if strings.Index(group, "term ") == 0 {
seriesName = strings.Replace(seriesName, subMatch[0], series.Tags[group[5:]], 1) frameName = strings.Replace(frameName, subMatch[0], dataField.Labels[group[5:]], 1)
} }
if v, ok := series.Tags[group]; ok { if v, ok := dataField.Labels[group]; ok {
seriesName = strings.Replace(seriesName, subMatch[0], v, 1) frameName = strings.Replace(frameName, subMatch[0], v, 1)
} }
if group == "metric" { if group == "metric" {
seriesName = strings.Replace(seriesName, subMatch[0], metricName, 1) frameName = strings.Replace(frameName, subMatch[0], metricName, 1)
} }
if group == "field" { if group == "field" {
seriesName = strings.Replace(seriesName, subMatch[0], field, 1) frameName = strings.Replace(frameName, subMatch[0], field, 1)
} }
} }
return seriesName return frameName
} }
// todo, if field and pipelineAgg // todo, if field and pipelineAgg
if field != "" && isPipelineAgg(metricType) { if field != "" && isPipelineAgg(metricType) {
if isPipelineAggWithMultipleBucketPaths(metricType) { if isPipelineAggWithMultipleBucketPaths(metricType) {
metricID := "" metricID := ""
if v, ok := series.Tags["metricId"]; ok { if v, ok := dataField.Labels["metricId"]; ok {
metricID = v metricID = v
} }
@ -521,14 +646,14 @@ func (rp *responseParser) getSeriesName(series plugins.DataTimeSeries, target *Q
metricName += " " + field metricName += " " + field
} }
delete(series.Tags, "metricId") delete(dataField.Labels, "metricId")
if len(series.Tags) == 0 { if len(dataField.Labels) == 0 {
return metricName return metricName
} }
name := "" name := ""
for _, v := range series.Tags { for _, v := range dataField.Labels {
name += v + " " name += v + " "
} }
@ -551,23 +676,23 @@ func (rp *responseParser) getMetricName(metric string) string {
return metric return metric
} }
func castToNullFloat(j *simplejson.Json) null.Float { func castToFloat(j *simplejson.Json) *float64 {
f, err := j.Float64() f, err := j.Float64()
if err == nil { if err == nil {
return null.FloatFrom(f) return &f
} }
if s, err := j.String(); err == nil { if s, err := j.String(); err == nil {
if strings.ToLower(s) == "nan" { if strings.ToLower(s) == "nan" {
return null.NewFloat(0, false) return nil
} }
if v, err := strconv.ParseFloat(s, 64); err == nil { if v, err := strconv.ParseFloat(s, 64); err == nil {
return null.FloatFromPtr(&v) return &v
} }
} }
return null.NewFloat(0, false) return nil
} }
func findAgg(target *Query, aggID string) (*BucketAgg, error) { func findAgg(target *Query, aggID string) (*BucketAgg, error) {
@ -597,47 +722,3 @@ func getErrorFromElasticResponse(response *es.SearchResponse) plugins.DataQueryR
return result return result
} }
func processTopMetricValues(stats *simplejson.Json, field string) null.Float {
for _, stat := range stats.MustArray() {
stat := stat.(map[string]interface{})
metrics, hasMetrics := stat["metrics"]
if hasMetrics {
metrics := metrics.(map[string]interface{})
metricValue, hasMetricValue := metrics[field]
if hasMetricValue && metricValue != nil {
return null.FloatFrom(metricValue.(float64))
}
}
}
return null.NewFloat(0, false)
}
func processTopMetrics(metric *MetricAgg, esAgg *simplejson.Json, props map[string]string) plugins.DataTimeSeriesSlice {
var series plugins.DataTimeSeriesSlice
metrics, hasMetrics := metric.Settings.MustMap()["metrics"].([]interface{})
if hasMetrics {
for _, metricField := range metrics {
newSeries := plugins.DataTimeSeries{
Tags: make(map[string]string),
}
for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v)
stats := bucket.GetPath(metric.ID, "top")
value := processTopMetricValues(stats, metricField.(string))
key := castToNullFloat(bucket.Get("key"))
newSeries.Points = append(newSeries.Points, plugins.DataTimePoint{value, key})
}
for k, v := range props {
newSeries.Tags[k] = v
}
newSeries.Tags["metric"] = "top_metrics"
newSeries.Tags["field"] = metricField.(string)
series = append(series, newSeries)
}
}
return series
}

File diff suppressed because it is too large Load Diff