set right series name

Signed-off-by: wph95 <wph657856467@gmail.com>
This commit is contained in:
wph95 2018-03-25 02:18:28 +08:00
parent bc5b59737c
commit 1e275d0cd1
No known key found for this signature in database
GPG Key ID: C8B3F2A207A6AFDC
3 changed files with 215 additions and 44 deletions

View File

@ -3,10 +3,11 @@ package elasticsearch
import (
"errors"
"github.com/grafana/grafana/pkg/components/simplejson"
"strconv"
)
var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom",
Lte: "$timeTo",
Lte: "$timeTo",
Format: "epoch_millis"}
type QueryBuilder struct {
@ -173,18 +174,21 @@ func (b *QueryBuilder) getFilters(model *simplejson.Json) FiltersAgg {
}
func (b *QueryBuilder) getTerms(model *simplejson.Json) TermsAgg {
agg := &TermsAgg{}
agg := &TermsAgg{Aggs: make(Aggs)}
settings := simplejson.NewFromAny(model.Get("settings").Interface())
agg.Terms.Field = model.Get("field").MustString()
if settings == nil {
return *agg
}
agg.Terms.Size = settings.Get("size").MustInt(0)
if agg.Terms.Size == 0 {
agg.Terms.Size = 500
sizeStr := settings.Get("size").MustString("")
size, err := strconv.Atoi(sizeStr)
if err != nil {
size = 500
}
agg.Terms.Size = size
orderBy := settings.Get("orderBy").MustString("")
if orderBy != "" {
agg.Terms.Order = make(map[string]interface{})
agg.Terms.Order[orderBy] = settings.Get("order").MustString("")
// if orderBy is a int, means this fields is metric result value
// TODO set subAggs

View File

@ -0,0 +1,26 @@
package elasticsearch
var metricAggType = map[string]string{
"count": "Count",
"avg": "Average",
"sum": "Sum",
"max": "Max",
"min": "Min",
"extended_stats": "Extended Stats",
"percentiles": "Percentiles",
"cardinality": "Unique Count",
"moving_avg": "Moving Average",
"derivative": "Derivative",
"raw_document": "Raw Document",
}
var extendedStats = map[string]string{
"avg": "Avg",
"min": "Min",
"max": "Max",
"sum": "Sum",
"count": "Count",
"std_deviation": "Std Dev",
"std_deviation_bounds_upper": "Std Dev Upper",
"std_deviation_bounds_lower": "Std Dev Lower",
}

View File

@ -7,33 +7,30 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/tsdb"
"strconv"
"regexp"
"strings"
)
type ElasticsearchResponseParser struct {
Responses []Response
Targets []QueryBuilder
Targets []*QueryBuilder
}
func (rp *ElasticsearchResponseParser) getTimeSeries() []interface{} {
func (rp *ElasticsearchResponseParser) getTimeSeries() *tsdb.QueryResult {
queryRes := tsdb.NewQueryResult()
for i, res := range rp.Responses {
var series []interface{}
target := rp.Targets[i]
props := make(map[string]interface{})
props := make(map[string]string)
series := make([]*tsdb.TimeSeries, 0)
rp.processBuckets(res.Aggregations, target, &series, props, 0)
rp.nameSeries(&series, target)
queryRes.Series = append(queryRes.Series, series...)
}
return queryRes
}
func findAgg(target QueryBuilder, aggId string) (*simplejson.Json, error) {
for _, v := range target.BucketAggs {
aggDef := simplejson.NewFromAny(v)
if aggId == aggDef.Get("id").MustString() {
return aggDef, nil
}
}
return nil, errors.New("can't found aggDef, aggID:" + aggId)
}
func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target QueryBuilder, series *[]interface{}, props map[string]interface{}, depth int) error {
func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target *QueryBuilder, series *[]*tsdb.TimeSeries, props map[string]string, depth int) (error) {
var err error
maxDepth := len(target.BucketAggs) - 1
for aggId, v := range aggs {
aggDef, _ := findAgg(target, aggId)
@ -44,43 +41,59 @@ func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{
if depth == maxDepth {
if aggDef.Get("type").MustString() == "date_histogram" {
rp.processMetrics(esAgg, target, series, props)
err = rp.processMetrics(esAgg, target, series, props)
if err != nil {
return err
}
} else {
return fmt.Errorf("not support type:%s", aggDef.Get("type").MustString())
}
} else {
for i, b := range esAgg.Get("buckets").MustArray() {
field := aggDef.Get("field").MustString()
bucket := simplejson.NewFromAny(b)
newProps := props
if key, err := bucket.Get("key").String(); err == nil {
newProps[field] = key
} else {
props["filter"] = strconv.Itoa(i)
}
if key, err := bucket.Get("key_as_string").String(); err == nil {
props[field] = key
}
rp.processBuckets(bucket.MustMap(), target, series, newProps, depth+1)
}
}
}
return nil
}
func mapCopy(originalMap, newMap *map[string]string) {
for k, v := range originalMap {
newMap[k] = v
}
}
func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target QueryBuilder, props map[string]string) ([]*tsdb.TimeSeries, error) {
var series []*tsdb.TimeSeries
func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target *QueryBuilder, series *[]*tsdb.TimeSeries, props map[string]string) (error) {
for _, v := range target.Metrics {
metric := simplejson.NewFromAny(v)
if metric.Get("hide").MustBool(false) {
continue
}
metricId := fmt.Sprintf("%d", metric.Get("id").MustInt())
metricField := metric.Get("field").MustString()
switch metric.Get("type").MustString() {
metricId := metric.Get("id").MustString()
metricField := metric.Get("field").MustString()
metricType := metric.Get("type").MustString()
switch metricType {
case "count":
newSeries := tsdb.TimeSeries{}
for _, v := range esAgg.Get("buckets").MustMap() {
for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v)
value := bucket.Get("doc_count").MustFloat64()
key := bucket.Get("key").MustFloat64()
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{null.FloatFromPtr(&value), null.FloatFromPtr(&key)})
value := castToNullFloat(bucket.Get("doc_count"))
key := castToNullFloat(bucket.Get("key"))
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
}
newSeries.Tags = props
newSeries.Tags["metric"] = "count"
series = append(series, &newSeries)
*series = append(*series, &newSeries)
case "percentiles":
buckets := esAgg.Get("buckets").MustArray()
@ -98,14 +111,142 @@ func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, ta
newSeries.Tags["field"] = metricField
for _, v := range buckets {
bucket := simplejson.NewFromAny(v)
valueStr := bucket.GetPath(metricId, "values", percentileName).MustString()
value, _ := strconv.ParseFloat(valueStr, 64)
key := bucket.Get("key").MustFloat64()
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{null.FloatFromPtr(&value), null.FloatFromPtr(&key)})
value := castToNullFloat(bucket.GetPath(metricId, "values", percentileName))
key := castToNullFloat(bucket.Get("key"))
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
}
series = append(series, &newSeries)
*series = append(*series, &newSeries)
}
default:
newSeries := tsdb.TimeSeries{}
newSeries.Tags = props
newSeries.Tags["metric"] = metricType
newSeries.Tags["field"] = metricField
for _, v := range esAgg.Get("buckets").MustArray() {
bucket := simplejson.NewFromAny(v)
key := castToNullFloat(bucket.Get("key"))
valueObj, err := bucket.Get(metricId).Map()
if err != nil {
break
}
var value null.Float
if _, ok := valueObj["normalized_value"]; ok {
value = castToNullFloat(bucket.GetPath(metricId, "normalized_value"))
} else {
value = castToNullFloat(bucket.GetPath(metricId, "value"))
}
newSeries.Points = append(newSeries.Points, tsdb.TimePoint{value, key})
}
*series = append(*series, &newSeries)
}
}
return nil
}
func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries, target *QueryBuilder) {
set := make(map[string]string)
for _, v := range *seriesList {
if metricType, exists := v.Tags["metric"]; exists {
if _, ok := set[metricType]; !ok {
set[metricType] = ""
}
}
}
return series
metricTypeCount := len(set)
for _, series := range *seriesList {
series.Name = rp.getSeriesName(series, target, metricTypeCount)
}
}
func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, target *QueryBuilder, metricTypeCount int) (string) {
metricName := rp.getMetricName(series.Tags["metric"])
delete(series.Tags, "metric")
field := ""
if v, ok := series.Tags["field"]; ok {
field = v
delete(series.Tags, "field")
}
if target.Alias != "" {
var re = regexp.MustCompile(`{{([\s\S]+?)}}`)
for _, match := range re.FindAllString(target.Alias, -1) {
group := match[2:len(match)-2]
if strings.HasPrefix(group, "term ") {
if term, ok := series.Tags["term "]; ok {
strings.Replace(target.Alias, match, term, 1)
}
}
if v, ok := series.Tags[group]; ok {
strings.Replace(target.Alias, match, v, 1)
}
switch group {
case "metric":
strings.Replace(target.Alias, match, metricName, 1)
case "field":
strings.Replace(target.Alias, match, field, 1)
}
}
}
// todo, if field and pipelineAgg
if field != "" {
metricName += " " + field
}
if len(series.Tags) == 0 {
return metricName
}
name := ""
for _, v := range series.Tags {
name += v + " "
}
if metricTypeCount == 1 {
return strings.TrimSpace(name)
}
return strings.TrimSpace(name) + " " + metricName
}
func (rp *ElasticsearchResponseParser) getMetricName(metric string) string {
if text, ok := metricAggType[metric]; ok {
return text
}
if text, ok := extendedStats[metric]; ok {
return text
}
return metric
}
func castToNullFloat(j *simplejson.Json) null.Float {
f, err := j.Float64()
if err == nil {
return null.FloatFrom(f)
}
s, err := j.String()
if err == nil {
v, _ := strconv.ParseFloat(s, 64)
return null.FloatFromPtr(&v)
}
return null.NewFloat(0, false)
}
func findAgg(target *QueryBuilder, aggId string) (*simplejson.Json, error) {
for _, v := range target.BucketAggs {
aggDef := simplejson.NewFromAny(v)
if aggId == aggDef.Get("id").MustString() {
return aggDef, nil
}
}
return nil, errors.New("can't found aggDef, aggID:" + aggId)
}