mirror of
https://github.com/grafana/grafana.git
synced 2024-12-01 13:09:22 -06:00
baab021fec
Refactor usage of legacy data contracts. Moves legacy data contracts to pkg/tsdb/legacydata package. Refactor pkg/expr to be a proper service/dependency that can be provided to wire to remove some unneeded dependencies to SSE in ngalert and other places. Refactor pkg/expr to not use the legacydata,RequestHandler and use backend.QueryDataHandler instead.
722 lines
18 KiB
Go
722 lines
18 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"errors"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
|
)
|
|
|
|
const (
|
|
// Metric types
|
|
countType = "count"
|
|
percentilesType = "percentiles"
|
|
extendedStatsType = "extended_stats"
|
|
topMetricsType = "top_metrics"
|
|
// Bucket types
|
|
dateHistType = "date_histogram"
|
|
histogramType = "histogram"
|
|
filtersType = "filters"
|
|
termsType = "terms"
|
|
geohashGridType = "geohash_grid"
|
|
)
|
|
|
|
type responseParser struct {
|
|
Responses []*es.SearchResponse
|
|
Targets []*Query
|
|
DebugInfo *es.SearchDebugInfo
|
|
}
|
|
|
|
var newResponseParser = func(responses []*es.SearchResponse, targets []*Query, debugInfo *es.SearchDebugInfo) *responseParser {
|
|
return &responseParser{
|
|
Responses: responses,
|
|
Targets: targets,
|
|
DebugInfo: debugInfo,
|
|
}
|
|
}
|
|
|
|
func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
|
|
result := backend.QueryDataResponse{
|
|
Responses: backend.Responses{},
|
|
}
|
|
if rp.Responses == nil {
|
|
return &result, nil
|
|
}
|
|
|
|
for i, res := range rp.Responses {
|
|
target := rp.Targets[i]
|
|
|
|
var debugInfo *simplejson.Json
|
|
if rp.DebugInfo != nil && i == 0 {
|
|
debugInfo = simplejson.NewFromAny(rp.DebugInfo)
|
|
}
|
|
|
|
if res.Error != nil {
|
|
errResult := getErrorFromElasticResponse(res)
|
|
result.Responses[target.RefID] = backend.DataResponse{
|
|
Error: errors.New(errResult),
|
|
Frames: data.Frames{
|
|
&data.Frame{
|
|
Meta: &data.FrameMeta{
|
|
Custom: debugInfo,
|
|
},
|
|
},
|
|
}}
|
|
continue
|
|
}
|
|
|
|
queryRes := backend.DataResponse{}
|
|
|
|
props := make(map[string]string)
|
|
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
|
|
if err != nil {
|
|
return &backend.QueryDataResponse{}, err
|
|
}
|
|
rp.nameFields(queryRes, target)
|
|
rp.trimDatapoints(queryRes, target)
|
|
|
|
for _, frame := range queryRes.Frames {
|
|
frame.Meta = &data.FrameMeta{
|
|
Custom: debugInfo,
|
|
}
|
|
}
|
|
result.Responses[target.RefID] = queryRes
|
|
}
|
|
return &result, nil
|
|
}
|
|
|
|
func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query,
|
|
queryResult *backend.DataResponse, props map[string]string, depth int) error {
|
|
var err error
|
|
maxDepth := len(target.BucketAggs) - 1
|
|
|
|
aggIDs := make([]string, 0)
|
|
for k := range aggs {
|
|
aggIDs = append(aggIDs, k)
|
|
}
|
|
sort.Strings(aggIDs)
|
|
for _, aggID := range aggIDs {
|
|
v := aggs[aggID]
|
|
aggDef, _ := findAgg(target, aggID)
|
|
esAgg := simplejson.NewFromAny(v)
|
|
if aggDef == nil {
|
|
continue
|
|
}
|
|
|
|
if depth == maxDepth {
|
|
if aggDef.Type == dateHistType {
|
|
err = rp.processMetrics(esAgg, target, queryResult, props)
|
|
} else {
|
|
err = rp.processAggregationDocs(esAgg, aggDef, target, queryResult, props)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
for _, b := range esAgg.Get("buckets").MustArray() {
|
|
bucket := simplejson.NewFromAny(b)
|
|
newProps := make(map[string]string)
|
|
|
|
for k, v := range props {
|
|
newProps[k] = v
|
|
}
|
|
|
|
if key, err := bucket.Get("key").String(); err == nil {
|
|
newProps[aggDef.Field] = key
|
|
} else if key, err := bucket.Get("key").Int64(); err == nil {
|
|
newProps[aggDef.Field] = strconv.FormatInt(key, 10)
|
|
}
|
|
|
|
if key, err := bucket.Get("key_as_string").String(); err == nil {
|
|
newProps[aggDef.Field] = key
|
|
}
|
|
err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
buckets := esAgg.Get("buckets").MustMap()
|
|
bucketKeys := make([]string, 0)
|
|
for k := range buckets {
|
|
bucketKeys = append(bucketKeys, k)
|
|
}
|
|
sort.Strings(bucketKeys)
|
|
|
|
for _, bucketKey := range bucketKeys {
|
|
bucket := simplejson.NewFromAny(buckets[bucketKey])
|
|
newProps := make(map[string]string)
|
|
|
|
for k, v := range props {
|
|
newProps[k] = v
|
|
}
|
|
|
|
newProps["filter"] = bucketKey
|
|
|
|
err = rp.processBuckets(bucket.MustMap(), target, queryResult, newProps, depth+1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// nolint:gocyclo
|
|
func (rp *responseParser) processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataResponse,
|
|
props map[string]string) error {
|
|
frames := data.Frames{}
|
|
esAggBuckets := esAgg.Get("buckets").MustArray()
|
|
|
|
for _, metric := range target.Metrics {
|
|
if metric.Hide {
|
|
continue
|
|
}
|
|
|
|
tags := make(map[string]string, len(props))
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets))
|
|
values := make([]*float64, 0, len(esAggBuckets))
|
|
|
|
switch metric.Type {
|
|
case countType:
|
|
for _, v := range esAggBuckets {
|
|
bucket := simplejson.NewFromAny(v)
|
|
value := castToFloat(bucket.Get("doc_count"))
|
|
key := castToFloat(bucket.Get("key"))
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
|
|
values = append(values, value)
|
|
}
|
|
|
|
for k, v := range props {
|
|
tags[k] = v
|
|
}
|
|
tags["metric"] = countType
|
|
frames = append(frames, data.NewFrame("",
|
|
data.NewField("time", nil, timeVector),
|
|
data.NewField("value", tags, values)))
|
|
case percentilesType:
|
|
buckets := esAggBuckets
|
|
if len(buckets) == 0 {
|
|
break
|
|
}
|
|
|
|
firstBucket := simplejson.NewFromAny(buckets[0])
|
|
percentiles := firstBucket.GetPath(metric.ID, "values").MustMap()
|
|
|
|
percentileKeys := make([]string, 0)
|
|
for k := range percentiles {
|
|
percentileKeys = append(percentileKeys, k)
|
|
}
|
|
sort.Strings(percentileKeys)
|
|
for _, percentileName := range percentileKeys {
|
|
tags := make(map[string]string, len(props))
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets))
|
|
values := make([]*float64, 0, len(esAggBuckets))
|
|
|
|
for k, v := range props {
|
|
tags[k] = v
|
|
}
|
|
tags["metric"] = "p" + percentileName
|
|
tags["field"] = metric.Field
|
|
for _, v := range buckets {
|
|
bucket := simplejson.NewFromAny(v)
|
|
value := castToFloat(bucket.GetPath(metric.ID, "values", percentileName))
|
|
key := castToFloat(bucket.Get("key"))
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
|
|
values = append(values, value)
|
|
}
|
|
frames = append(frames, data.NewFrame("",
|
|
data.NewField("time", nil, timeVector),
|
|
data.NewField("value", tags, values)))
|
|
}
|
|
case topMetricsType:
|
|
buckets := esAggBuckets
|
|
metrics := metric.Settings.Get("metrics").MustArray()
|
|
|
|
for _, metricField := range metrics {
|
|
tags := make(map[string]string, len(props))
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets))
|
|
values := make([]*float64, 0, len(esAggBuckets))
|
|
for k, v := range props {
|
|
tags[k] = v
|
|
}
|
|
|
|
tags["field"] = metricField.(string)
|
|
tags["metric"] = "top_metrics"
|
|
|
|
for _, v := range buckets {
|
|
bucket := simplejson.NewFromAny(v)
|
|
stats := bucket.GetPath(metric.ID, "top")
|
|
key := castToFloat(bucket.Get("key"))
|
|
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
|
|
|
|
for _, stat := range stats.MustArray() {
|
|
stat := stat.(map[string]interface{})
|
|
|
|
metrics, hasMetrics := stat["metrics"]
|
|
if hasMetrics {
|
|
metrics := metrics.(map[string]interface{})
|
|
metricValue, hasMetricValue := metrics[metricField.(string)]
|
|
|
|
if hasMetricValue && metricValue != nil {
|
|
v := metricValue.(float64)
|
|
values = append(values, &v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
frames = append(frames, data.NewFrame("",
|
|
data.NewField("time", nil, timeVector),
|
|
data.NewField("value", tags, values),
|
|
))
|
|
}
|
|
|
|
case extendedStatsType:
|
|
buckets := esAggBuckets
|
|
|
|
metaKeys := make([]string, 0)
|
|
meta := metric.Meta.MustMap()
|
|
for k := range meta {
|
|
metaKeys = append(metaKeys, k)
|
|
}
|
|
sort.Strings(metaKeys)
|
|
for _, statName := range metaKeys {
|
|
v := meta[statName]
|
|
if enabled, ok := v.(bool); !ok || !enabled {
|
|
continue
|
|
}
|
|
|
|
tags := make(map[string]string, len(props))
|
|
timeVector := make([]time.Time, 0, len(esAggBuckets))
|
|
values := make([]*float64, 0, len(esAggBuckets))
|
|
|
|
for k, v := range props {
|
|
tags[k] = v
|
|
}
|
|
tags["metric"] = statName
|
|
tags["field"] = metric.Field
|
|
|
|
for _, v := range buckets {
|
|
bucket := simplejson.NewFromAny(v)
|
|
key := castToFloat(bucket.Get("key"))
|
|
var value *float64
|
|
switch statName {
|
|
case "std_deviation_bounds_upper":
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
|
|
case "std_deviation_bounds_lower":
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
|
|
default:
|
|
value = castToFloat(bucket.GetPath(metric.ID, statName))
|
|
}
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
|
|
values = append(values, value)
|
|
}
|
|
labels := tags
|
|
frames = append(frames, data.NewFrame("",
|
|
data.NewField("time", nil, timeVector),
|
|
data.NewField("value", labels, values)))
|
|
}
|
|
default:
|
|
for k, v := range props {
|
|
tags[k] = v
|
|
}
|
|
|
|
tags["metric"] = metric.Type
|
|
tags["field"] = metric.Field
|
|
tags["metricId"] = metric.ID
|
|
for _, v := range esAggBuckets {
|
|
bucket := simplejson.NewFromAny(v)
|
|
key := castToFloat(bucket.Get("key"))
|
|
valueObj, err := bucket.Get(metric.ID).Map()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
var value *float64
|
|
if _, ok := valueObj["normalized_value"]; ok {
|
|
value = castToFloat(bucket.GetPath(metric.ID, "normalized_value"))
|
|
} else {
|
|
value = castToFloat(bucket.GetPath(metric.ID, "value"))
|
|
}
|
|
timeVector = append(timeVector, time.Unix(int64(*key)/1000, 0).UTC())
|
|
values = append(values, value)
|
|
}
|
|
frames = append(frames, data.NewFrame("",
|
|
data.NewField("time", nil, timeVector),
|
|
data.NewField("value", tags, values)))
|
|
}
|
|
}
|
|
if query.Frames != nil {
|
|
oldFrames := query.Frames
|
|
frames = append(oldFrames, frames...)
|
|
}
|
|
query.Frames = frames
|
|
return nil
|
|
}
|
|
|
|
func (rp *responseParser) processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Query,
|
|
queryResult *backend.DataResponse, props map[string]string) error {
|
|
propKeys := make([]string, 0)
|
|
for k := range props {
|
|
propKeys = append(propKeys, k)
|
|
}
|
|
sort.Strings(propKeys)
|
|
frames := data.Frames{}
|
|
var fields []*data.Field
|
|
|
|
if queryResult.Frames == nil {
|
|
for _, propKey := range propKeys {
|
|
fields = append(fields, data.NewField(propKey, nil, []*string{}))
|
|
}
|
|
}
|
|
|
|
addMetricValue := func(values []interface{}, metricName string, value *float64) {
|
|
index := -1
|
|
for i, f := range fields {
|
|
if f.Name == metricName {
|
|
index = i
|
|
break
|
|
}
|
|
}
|
|
|
|
var field data.Field
|
|
if index == -1 {
|
|
field = *data.NewField(metricName, nil, []*float64{})
|
|
fields = append(fields, &field)
|
|
} else {
|
|
field = *fields[index]
|
|
}
|
|
field.Append(value)
|
|
}
|
|
|
|
for _, v := range esAgg.Get("buckets").MustArray() {
|
|
bucket := simplejson.NewFromAny(v)
|
|
var values []interface{}
|
|
|
|
found := false
|
|
for _, e := range fields {
|
|
for _, propKey := range propKeys {
|
|
if e.Name == propKey {
|
|
e.Append(props[propKey])
|
|
}
|
|
}
|
|
if e.Name == aggDef.Field {
|
|
found = true
|
|
if key, err := bucket.Get("key").String(); err == nil {
|
|
e.Append(&key)
|
|
} else {
|
|
f, err := bucket.Get("key").Float64()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.Append(&f)
|
|
}
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
var aggDefField *data.Field
|
|
if key, err := bucket.Get("key").String(); err == nil {
|
|
aggDefField = extractDataField(aggDef.Field, &key)
|
|
aggDefField.Append(&key)
|
|
} else {
|
|
f, err := bucket.Get("key").Float64()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
aggDefField = extractDataField(aggDef.Field, &f)
|
|
aggDefField.Append(&f)
|
|
}
|
|
fields = append(fields, aggDefField)
|
|
}
|
|
|
|
for _, metric := range target.Metrics {
|
|
switch metric.Type {
|
|
case countType:
|
|
addMetricValue(values, rp.getMetricName(metric.Type), castToFloat(bucket.Get("doc_count")))
|
|
case extendedStatsType:
|
|
metaKeys := make([]string, 0)
|
|
meta := metric.Meta.MustMap()
|
|
for k := range meta {
|
|
metaKeys = append(metaKeys, k)
|
|
}
|
|
sort.Strings(metaKeys)
|
|
for _, statName := range metaKeys {
|
|
v := meta[statName]
|
|
if enabled, ok := v.(bool); !ok || !enabled {
|
|
continue
|
|
}
|
|
|
|
var value *float64
|
|
switch statName {
|
|
case "std_deviation_bounds_upper":
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "upper"))
|
|
case "std_deviation_bounds_lower":
|
|
value = castToFloat(bucket.GetPath(metric.ID, "std_deviation_bounds", "lower"))
|
|
default:
|
|
value = castToFloat(bucket.GetPath(metric.ID, statName))
|
|
}
|
|
|
|
addMetricValue(values, rp.getMetricName(metric.Type), value)
|
|
break
|
|
}
|
|
default:
|
|
metricName := rp.getMetricName(metric.Type)
|
|
otherMetrics := make([]*MetricAgg, 0)
|
|
|
|
for _, m := range target.Metrics {
|
|
if m.Type == metric.Type {
|
|
otherMetrics = append(otherMetrics, m)
|
|
}
|
|
}
|
|
|
|
if len(otherMetrics) > 1 {
|
|
metricName += " " + metric.Field
|
|
if metric.Type == "bucket_script" {
|
|
// Use the formula in the column name
|
|
metricName = metric.Settings.Get("script").MustString("")
|
|
}
|
|
}
|
|
|
|
addMetricValue(values, metricName, castToFloat(bucket.GetPath(metric.ID, "value")))
|
|
}
|
|
}
|
|
|
|
var dataFields []*data.Field
|
|
dataFields = append(dataFields, fields...)
|
|
|
|
frames = data.Frames{
|
|
&data.Frame{
|
|
Fields: dataFields,
|
|
}}
|
|
}
|
|
queryResult.Frames = frames
|
|
return nil
|
|
}
|
|
|
|
func extractDataField(name string, v interface{}) *data.Field {
|
|
switch v.(type) {
|
|
case *string:
|
|
return data.NewField(name, nil, []*string{})
|
|
case *float64:
|
|
return data.NewField(name, nil, []*float64{})
|
|
default:
|
|
return &data.Field{}
|
|
}
|
|
}
|
|
|
|
func (rp *responseParser) trimDatapoints(queryResult backend.DataResponse, target *Query) {
|
|
var histogram *BucketAgg
|
|
for _, bucketAgg := range target.BucketAggs {
|
|
if bucketAgg.Type == dateHistType {
|
|
histogram = bucketAgg
|
|
break
|
|
}
|
|
}
|
|
|
|
if histogram == nil {
|
|
return
|
|
}
|
|
|
|
trimEdges, err := histogram.Settings.Get("trimEdges").Int()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
frames := queryResult.Frames
|
|
|
|
for _, frame := range frames {
|
|
for _, field := range frame.Fields {
|
|
if field.Len() > trimEdges*2 {
|
|
for i := 0; i < field.Len(); i++ {
|
|
if i < trimEdges || i > field.Len()-trimEdges {
|
|
field.Delete(i)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rp *responseParser) nameFields(queryResult backend.DataResponse, target *Query) {
|
|
set := make(map[string]struct{})
|
|
frames := queryResult.Frames
|
|
for _, v := range frames {
|
|
for _, vv := range v.Fields {
|
|
if metricType, exists := vv.Labels["metric"]; exists {
|
|
if _, ok := set[metricType]; !ok {
|
|
set[metricType] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
metricTypeCount := len(set)
|
|
for i := range frames {
|
|
fieldName := rp.getFieldName(*frames[i].Fields[1], target, metricTypeCount)
|
|
for _, field := range frames[i].Fields {
|
|
field.SetConfig(&data.FieldConfig{DisplayNameFromDS: fieldName})
|
|
}
|
|
}
|
|
}
|
|
|
|
var aliasPatternRegex = regexp.MustCompile(`\{\{([\s\S]+?)\}\}`)
|
|
|
|
func (rp *responseParser) getFieldName(dataField data.Field, target *Query, metricTypeCount int) string {
|
|
metricType := dataField.Labels["metric"]
|
|
metricName := rp.getMetricName(metricType)
|
|
delete(dataField.Labels, "metric")
|
|
|
|
field := ""
|
|
if v, ok := dataField.Labels["field"]; ok {
|
|
field = v
|
|
delete(dataField.Labels, "field")
|
|
}
|
|
|
|
if target.Alias != "" {
|
|
frameName := target.Alias
|
|
|
|
subMatches := aliasPatternRegex.FindAllStringSubmatch(target.Alias, -1)
|
|
for _, subMatch := range subMatches {
|
|
group := subMatch[0]
|
|
|
|
if len(subMatch) > 1 {
|
|
group = subMatch[1]
|
|
}
|
|
|
|
if strings.Index(group, "term ") == 0 {
|
|
frameName = strings.Replace(frameName, subMatch[0], dataField.Labels[group[5:]], 1)
|
|
}
|
|
if v, ok := dataField.Labels[group]; ok {
|
|
frameName = strings.Replace(frameName, subMatch[0], v, 1)
|
|
}
|
|
if group == "metric" {
|
|
frameName = strings.Replace(frameName, subMatch[0], metricName, 1)
|
|
}
|
|
if group == "field" {
|
|
frameName = strings.Replace(frameName, subMatch[0], field, 1)
|
|
}
|
|
}
|
|
|
|
return frameName
|
|
}
|
|
// todo, if field and pipelineAgg
|
|
if field != "" && isPipelineAgg(metricType) {
|
|
if isPipelineAggWithMultipleBucketPaths(metricType) {
|
|
metricID := ""
|
|
if v, ok := dataField.Labels["metricId"]; ok {
|
|
metricID = v
|
|
}
|
|
|
|
for _, metric := range target.Metrics {
|
|
if metric.ID == metricID {
|
|
metricName = metric.Settings.Get("script").MustString()
|
|
for name, pipelineAgg := range metric.PipelineVariables {
|
|
for _, m := range target.Metrics {
|
|
if m.ID == pipelineAgg {
|
|
metricName = strings.ReplaceAll(metricName, "params."+name, describeMetric(m.Type, m.Field))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
found := false
|
|
for _, metric := range target.Metrics {
|
|
if metric.ID == field {
|
|
metricName += " " + describeMetric(metric.Type, field)
|
|
found = true
|
|
}
|
|
}
|
|
if !found {
|
|
metricName = "Unset"
|
|
}
|
|
}
|
|
} else if field != "" {
|
|
metricName += " " + field
|
|
}
|
|
|
|
delete(dataField.Labels, "metricId")
|
|
|
|
if len(dataField.Labels) == 0 {
|
|
return metricName
|
|
}
|
|
|
|
name := ""
|
|
for _, v := range dataField.Labels {
|
|
name += v + " "
|
|
}
|
|
|
|
if metricTypeCount == 1 {
|
|
return strings.TrimSpace(name)
|
|
}
|
|
|
|
return strings.TrimSpace(name) + " " + metricName
|
|
}
|
|
|
|
func (rp *responseParser) getMetricName(metric string) string {
|
|
if text, ok := metricAggType[metric]; ok {
|
|
return text
|
|
}
|
|
|
|
if text, ok := extendedStats[metric]; ok {
|
|
return text
|
|
}
|
|
|
|
return metric
|
|
}
|
|
|
|
func castToFloat(j *simplejson.Json) *float64 {
|
|
f, err := j.Float64()
|
|
if err == nil {
|
|
return &f
|
|
}
|
|
|
|
if s, err := j.String(); err == nil {
|
|
if strings.ToLower(s) == "nan" {
|
|
return nil
|
|
}
|
|
|
|
if v, err := strconv.ParseFloat(s, 64); err == nil {
|
|
return &v
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func findAgg(target *Query, aggID string) (*BucketAgg, error) {
|
|
for _, v := range target.BucketAggs {
|
|
if aggID == v.ID {
|
|
return v, nil
|
|
}
|
|
}
|
|
return nil, errors.New("can't found aggDef, aggID:" + aggID)
|
|
}
|
|
|
|
func getErrorFromElasticResponse(response *es.SearchResponse) string {
|
|
var errorString string
|
|
json := simplejson.NewFromAny(response.Error)
|
|
reason := json.Get("reason").MustString()
|
|
rootCauseReason := json.Get("root_cause").GetIndex(0).Get("reason").MustString()
|
|
|
|
switch {
|
|
case rootCauseReason != "":
|
|
errorString = rootCauseReason
|
|
case reason != "":
|
|
errorString = reason
|
|
default:
|
|
errorString = "Unknown elasticsearch error response"
|
|
}
|
|
|
|
return errorString
|
|
}
|