diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index 8fd82a179e8..0ce9eca0972 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -1,6 +1,7 @@ package elasticsearch import ( + "bytes" "context" "encoding/json" "errors" @@ -18,7 +19,8 @@ import ( ) type ElasticsearchExecutor struct { - Transport *http.Transport + QueryParser *ElasticSearchQueryParser + Transport *http.Transport } var ( @@ -47,17 +49,21 @@ func (e *ElasticsearchExecutor) Query(ctx context.Context, dsInfo *models.DataSo result := &tsdb.Response{} result.Results = make(map[string]*tsdb.QueryResult) - queryParser := ElasticSearchQueryParser{ - dsInfo, - tsdbQuery.TimeRange, - tsdbQuery.Queries, - } - - payload, targets, err := queryParser.Parse() + queries, err := e.getQuery(dsInfo, tsdbQuery) if err != nil { return nil, err } + buff := bytes.Buffer{} + for _, q := range queries { + s, err := q.Build(tsdbQuery, dsInfo) + if err != nil { + return nil, err + } + buff.WriteString(s) + } + payload := buff.String() + if setting.Env == setting.DEV { glog.Debug("Elasticsearch playload", "raw playload", payload) } @@ -96,12 +102,30 @@ func (e *ElasticsearchExecutor) Query(ctx context.Context, dsInfo *models.DataSo return nil, errors.New(res.getErrMsg()) } } - responseParser := ElasticsearchResponseParser{responses.Responses, targets} + responseParser := ElasticsearchResponseParser{responses.Responses, queries} queryRes := responseParser.getTimeSeries() result.Results["A"] = queryRes return result, nil } +func (e *ElasticsearchExecutor) getQuery(dsInfo *models.DataSource, context *tsdb.TsdbQuery) ([]*Query, error) { + queries := make([]*Query, 0) + if len(context.Queries) == 0 { + return nil, fmt.Errorf("query request contains no queries") + } + for _, v := range context.Queries { + + query, err := e.QueryParser.Parse(v.Model, dsInfo) + if err != nil { + return nil, err + } + queries = append(queries, query) + + } + return queries, nil + +} + func (e *ElasticsearchExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) { u, _ := url.Parse(dsInfo.Url) u.Path = path.Join(u.Path, "_msearch") diff --git a/pkg/tsdb/elasticsearch/model_parser.go b/pkg/tsdb/elasticsearch/model_parser.go index 7da6765e06c..0d016dc58a5 100644 --- a/pkg/tsdb/elasticsearch/model_parser.go +++ b/pkg/tsdb/elasticsearch/model_parser.go @@ -1,62 +1,45 @@ package elasticsearch import ( - "bytes" - "encoding/json" "fmt" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/tsdb" "github.com/leibowitz/moment" - "src/github.com/davecgh/go-spew/spew" - "strconv" "strings" "time" ) type ElasticSearchQueryParser struct { - DsInfo *models.DataSource - TimeRange *tsdb.TimeRange - Queries []*tsdb.Query } -func (qp *ElasticSearchQueryParser) Parse() (string, []*QueryBuilder, error) { - payload := bytes.Buffer{} - queryHeader := qp.getQueryHeader() - targets := make([]*QueryBuilder, 0) - for _, q := range qp.Queries { - timeField, err := q.Model.Get("timeField").String() - if err != nil { - return "", nil, err - } - rawQuery := q.Model.Get("query").MustString("") - bucketAggs := q.Model.Get("bucketAggs").MustArray() - metrics := q.Model.Get("metrics").MustArray() - alias := q.Model.Get("alias").MustString("") - builder := QueryBuilder{timeField, rawQuery, bucketAggs, metrics, alias} - targets = append(targets, &builder) - - query, err := builder.Build() - if err != nil { - return "", nil, err - } - queryBytes, err := json.Marshal(query) - if err != nil { - return "", nil, err - } - - payload.WriteString(queryHeader.String() + "\n") - payload.WriteString(string(queryBytes) + "\n") +func (qp *ElasticSearchQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource) (*Query, error) { + //payload := bytes.Buffer{} + //queryHeader := qp.getQueryHeader() + timeField, err := model.Get("timeField").String() + if err != nil { + return nil, err + } + rawQuery := model.Get("query").MustString("") + bucketAggs := model.Get("bucketAggs").MustArray() + metrics := model.Get("metrics").MustArray() + alias := model.Get("alias").MustString("") + parsedInterval, err := tsdb.GetIntervalFrom(dsInfo, model, time.Millisecond) + if err != nil { + return nil, err } - p, err := qp.payloadReplace(payload.String(), qp.DsInfo.JsonData) - - return p, targets, err + return &Query{timeField, + rawQuery, + bucketAggs, + metrics, + alias, + parsedInterval}, nil } -func (qp *ElasticSearchQueryParser) getQueryHeader() *QueryHeader { +func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader { var header QueryHeader - esVersion := qp.DsInfo.JsonData.Get("esVersion").MustInt() + esVersion := dsInfo.JsonData.Get("esVersion").MustInt() searchType := "query_then_fetch" if esVersion < 5 { @@ -64,29 +47,13 @@ func (qp *ElasticSearchQueryParser) getQueryHeader() *QueryHeader { } header.SearchType = searchType header.IgnoreUnavailable = true - header.Index = getIndexList(qp.DsInfo.Database, qp.DsInfo.JsonData.Get("interval").MustString(""), qp.TimeRange) + header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(""), timeRange) if esVersion >= 56 { - header.MaxConcurrentShardRequests = qp.DsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt() + header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt() } return &header } -func (qp *ElasticSearchQueryParser) payloadReplace(payload string, model *simplejson.Json) (string, error) { - parsedInterval, err := tsdb.GetIntervalFrom(qp.DsInfo, model, time.Millisecond) - if err != nil { - return "", nil - } - - interval := intervalCalculator.Calculate(qp.TimeRange, parsedInterval) - glog.Warn(spew.Sdump(interval)) - payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", qp.TimeRange.GetFromAsMsEpoch()), -1) - payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", qp.TimeRange.GetToAsMsEpoch()), -1) - payload = strings.Replace(payload, "$interval", interval.Text, -1) - payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1) - payload = strings.Replace(payload, "$__interval", interval.Text, -1) - - return payload, nil -} func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string { if interval == "" { diff --git a/pkg/tsdb/elasticsearch/models.go b/pkg/tsdb/elasticsearch/models.go index d758e2159de..822df2dd4d1 100644 --- a/pkg/tsdb/elasticsearch/models.go +++ b/pkg/tsdb/elasticsearch/models.go @@ -1,25 +1,25 @@ package elasticsearch import ( - "github.com/grafana/grafana/pkg/components/simplejson" "bytes" - "fmt" "encoding/json" + "fmt" + "github.com/grafana/grafana/pkg/components/simplejson" ) type QueryHeader struct { SearchType string `json:"search_type"` IgnoreUnavailable bool `json:"ignore_unavailable"` Index interface{} `json:"index"` - MaxConcurrentShardRequests int `json:"max_concurrent_shard_requests"` + MaxConcurrentShardRequests int `json:"max_concurrent_shard_requests,omitempty"` } -func (q *QueryHeader) String() (string) { +func (q *QueryHeader) String() string { r, _ := json.Marshal(q) return string(r) } -type Query struct { +type Request struct { Query map[string]interface{} `json:"query"` Aggs Aggs `json:"aggs"` Size int `json:"size"` @@ -45,11 +45,10 @@ type FiltersAgg struct { } type TermsAggSetting struct { - Field string `json:"field"` - Size int `json:"size"` - Order map[string]interface{} `json:"order"` - MinDocCount int `json:"min_doc_count"` - Missing string `json:"missing"` + Field string `json:"field"` + Size int `json:"size"` + Order map[string]interface{} `json:"order"` + Missing string `json:"missing,omitempty"` } type TermsAgg struct { @@ -104,7 +103,7 @@ type Response struct { Aggregations map[string]interface{} `json:"aggregations"` } -func (r *Response) getErrMsg() (string) { +func (r *Response) getErrMsg() string { var msg bytes.Buffer errJson := simplejson.NewFromAny(r.Err) errType, err := errJson.Get("type").String() diff --git a/pkg/tsdb/elasticsearch/query.go b/pkg/tsdb/elasticsearch/query.go index d6d70e79a2a..51f1ebb5d7a 100644 --- a/pkg/tsdb/elasticsearch/query.go +++ b/pkg/tsdb/elasticsearch/query.go @@ -1,81 +1,103 @@ package elasticsearch import ( + "bytes" + "encoding/json" "errors" + "fmt" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" "strconv" + "strings" + "time" ) var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom", - Lte: "$timeTo", + Lte: "$timeTo", Format: "epoch_millis"} -type QueryBuilder struct { - TimeField string - RawQuery string - BucketAggs []interface{} - Metrics []interface{} - Alias string +type Query struct { + TimeField string `json:"timeField"` + RawQuery string `json:"query"` + BucketAggs []interface{} `json:"bucketAggs"` + Metrics []interface{} `json:"metrics"` + Alias string `json:"Alias"` + Interval time.Duration } -func (b *QueryBuilder) Build() (Query, error) { - var err error - var res Query - res.Query = make(map[string]interface{}) - res.Size = 0 +func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) { + var req Request + payload := bytes.Buffer{} - if err != nil { - return res, err - } - - boolQuery := BoolQuery{} - boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(b.TimeField, rangeFilterSetting)) - boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, b.RawQuery)) - res.Query["bool"] = boolQuery + req.Size = 0 + q.renderReqQuery(&req) // handle document query - if len(b.BucketAggs) == 0 { - if len(b.Metrics) > 0 { - metric := simplejson.NewFromAny(b.Metrics[0]) + if q.isRawDocumentQuery() { + return "", errors.New("alert not support Raw_Document") + } + + err := q.parseAggs(&req) + if err != nil { + return "", err + } + + reqBytes, err := json.Marshal(req) + reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo) + payload.WriteString(reqHeader.String() + "\n") + payload.WriteString(string(reqBytes) + "\n") + return q.renderTemplate(payload.String(), queryContext) +} + +func (q *Query) isRawDocumentQuery() bool { + if len(q.BucketAggs) == 0 { + if len(q.Metrics) > 0 { + metric := simplejson.NewFromAny(q.Metrics[0]) if metric.Get("type").MustString("") == "raw_document" { - return res, errors.New("alert not support Raw_Document") + return true } } } - aggs, err := b.parseAggs(b.BucketAggs, b.Metrics) - res.Aggs = aggs["aggs"].(Aggs) - - return res, err + return false } -func (b *QueryBuilder) parseAggs(bucketAggs []interface{}, metrics []interface{}) (Aggs, error) { - query := make(Aggs) - nestedAggs := query - for _, aggRaw := range bucketAggs { +func (q *Query) renderReqQuery(req *Request) { + req.Query = make(map[string]interface{}) + boolQuery := BoolQuery{} + boolQuery.Filter = append(boolQuery.Filter, newRangeFilter(q.TimeField, rangeFilterSetting)) + boolQuery.Filter = append(boolQuery.Filter, newQueryStringFilter(true, q.RawQuery)) + req.Query["bool"] = boolQuery +} + +func (q *Query) parseAggs(req *Request) error { + aggs := make(Aggs) + nestedAggs := aggs + for _, aggRaw := range q.BucketAggs { esAggs := make(Aggs) aggJson := simplejson.NewFromAny(aggRaw) aggType, err := aggJson.Get("type").String() if err != nil { - return nil, err + return err } id, err := aggJson.Get("id").String() if err != nil { - return nil, err + return err } switch aggType { case "date_histogram": - esAggs["date_histogram"] = b.getDateHistogramAgg(aggJson) + esAggs["date_histogram"] = q.getDateHistogramAgg(aggJson) case "histogram": - esAggs["histogram"] = b.getHistogramAgg(aggJson) + esAggs["histogram"] = q.getHistogramAgg(aggJson) case "filters": - esAggs["filters"] = b.getFilters(aggJson) + esAggs["filters"] = q.getFilters(aggJson) case "terms": - terms := b.getTerms(aggJson) + terms := q.getTerms(aggJson) esAggs["terms"] = terms.Terms esAggs["aggs"] = terms.Aggs case "geohash_grid": - return nil, errors.New("alert not support Geo_Hash_Grid") + return errors.New("alert not support Geo_Hash_Grid") } if _, ok := nestedAggs["aggs"]; !ok { @@ -90,40 +112,51 @@ func (b *QueryBuilder) parseAggs(bucketAggs []interface{}, metrics []interface{} } nestedAggs["aggs"] = make(Aggs) - for _, metricRaw := range metrics { + for _, metricRaw := range q.Metrics { metric := make(Metric) metricJson := simplejson.NewFromAny(metricRaw) id, err := metricJson.Get("id").String() if err != nil { - return nil, err + return err } metricType, err := metricJson.Get("type").String() if err != nil { - return nil, err + return err } if metricType == "count" { continue } - // todo support pipeline Agg + settings := metricJson.Get("settings").MustMap(map[string]interface{}{}) + + if isPipelineAgg(metricType) { + pipelineAgg := metricJson.Get("pipelineAgg").MustString("") + if _, err := strconv.Atoi(pipelineAgg); err == nil { + settings["buckets_path"] = pipelineAgg + } else { + continue + } + + } else { + settings["field"] = metricJson.Get("field").MustString() + } - settings := metricJson.Get("settings").MustMap() - settings["field"] = metricJson.Get("field").MustString() metric[metricType] = settings nestedAggs["aggs"].(Aggs)[id] = metric } - return query, nil + req.Aggs = aggs["aggs"].(Aggs) + return nil } -func (b *QueryBuilder) getDateHistogramAgg(model *simplejson.Json) DateHistogramAgg { +func (q *Query) getDateHistogramAgg(model *simplejson.Json) *DateHistogramAgg { agg := &DateHistogramAgg{} settings := simplejson.NewFromAny(model.Get("settings").Interface()) interval, err := settings.Get("interval").String() if err == nil { agg.Interval = interval } - agg.Field = b.TimeField + agg.Field = q.TimeField agg.MinDocCount = settings.Get("min_doc_count").MustInt(0) agg.ExtendedBounds = ExtendedBounds{"$timeFrom", "$timeTo"} agg.Format = "epoch_millis" @@ -136,10 +169,10 @@ func (b *QueryBuilder) getDateHistogramAgg(model *simplejson.Json) DateHistogram if err == nil { agg.Missing = missing } - return *agg + return agg } -func (b *QueryBuilder) getHistogramAgg(model *simplejson.Json) HistogramAgg { +func (q *Query) getHistogramAgg(model *simplejson.Json) *HistogramAgg { agg := &HistogramAgg{} settings := simplejson.NewFromAny(model.Get("settings").Interface()) interval, err := settings.Get("interval").String() @@ -155,10 +188,10 @@ func (b *QueryBuilder) getHistogramAgg(model *simplejson.Json) HistogramAgg { if err == nil { agg.Missing = missing } - return *agg + return agg } -func (b *QueryBuilder) getFilters(model *simplejson.Json) FiltersAgg { +func (q *Query) getFilters(model *simplejson.Json) *FiltersAgg { agg := &FiltersAgg{} settings := simplejson.NewFromAny(model.Get("settings").Interface()) for filter := range settings.Get("filters").MustArray() { @@ -170,15 +203,15 @@ func (b *QueryBuilder) getFilters(model *simplejson.Json) FiltersAgg { } agg.Filter[label] = newQueryStringFilter(true, query) } - return *agg + return agg } -func (b *QueryBuilder) getTerms(model *simplejson.Json) TermsAgg { +func (q *Query) getTerms(model *simplejson.Json) *TermsAgg { agg := &TermsAgg{Aggs: make(Aggs)} settings := simplejson.NewFromAny(model.Get("settings").Interface()) agg.Terms.Field = model.Get("field").MustString() if settings == nil { - return *agg + return agg } sizeStr := settings.Get("size").MustString("") size, err := strconv.Atoi(sizeStr) @@ -186,17 +219,25 @@ func (b *QueryBuilder) getTerms(model *simplejson.Json) TermsAgg { size = 500 } agg.Terms.Size = size - orderBy := settings.Get("orderBy").MustString("") - if orderBy != "" { + orderBy, err := settings.Get("orderBy").String() + if err == nil { agg.Terms.Order = make(map[string]interface{}) agg.Terms.Order[orderBy] = settings.Get("order").MustString("") - // if orderBy is a int, means this fields is metric result value - // TODO set subAggs - } - - minDocCount, err := settings.Get("min_doc_count").Int() - if err == nil { - agg.Terms.MinDocCount = minDocCount + if _, err := strconv.Atoi(orderBy); err != nil { + for _, metricI := range q.Metrics { + metric := simplejson.NewFromAny(metricI) + metricId := metric.Get("id").MustString() + if metricId == orderBy { + subAggs := make(Aggs) + metricField := metric.Get("field").MustString() + metricType := metric.Get("type").MustString() + subAggs[metricType] = map[string]string{"field": metricField} + agg.Aggs = make(Aggs) + agg.Aggs[metricId] = subAggs + break + } + } + } } missing, err := settings.Get("missing").String() @@ -204,5 +245,16 @@ func (b *QueryBuilder) getTerms(model *simplejson.Json) TermsAgg { agg.Terms.Missing = missing } - return *agg + return agg +} + +func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (string, error) { + timeRange := queryContext.TimeRange + interval := intervalCalculator.Calculate(timeRange, q.Interval) + payload = strings.Replace(payload, "$timeFrom", fmt.Sprintf("%d", timeRange.GetFromAsMsEpoch()), -1) + payload = strings.Replace(payload, "$timeTo", fmt.Sprintf("%d", timeRange.GetToAsMsEpoch()), -1) + payload = strings.Replace(payload, "$interval", interval.Text, -1) + payload = strings.Replace(payload, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1) + payload = strings.Replace(payload, "$__interval", interval.Text, -1) + return payload, nil } diff --git a/pkg/tsdb/elasticsearch/query_def.go b/pkg/tsdb/elasticsearch/query_def.go index 5dc02aa359e..6f78f02f346 100644 --- a/pkg/tsdb/elasticsearch/query_def.go +++ b/pkg/tsdb/elasticsearch/query_def.go @@ -24,3 +24,21 @@ var extendedStats = map[string]string{ "std_deviation_bounds_upper": "Std Dev Upper", "std_deviation_bounds_lower": "Std Dev Lower", } + +var pipelineOptions = map[string]string{ + "moving_avg": "moving_avg", + "derivative": "derivative", +} + +func isPipelineAgg(metricType string) bool { + if _, ok := pipelineOptions[metricType]; ok { + return true + } + return false +} + +func describeMetric(metricType, field string) string { + text := metricAggType[metricType] + return text + " " + field + +} diff --git a/pkg/tsdb/elasticsearch/query_test.go b/pkg/tsdb/elasticsearch/query_test.go new file mode 100644 index 00000000000..992469175b6 --- /dev/null +++ b/pkg/tsdb/elasticsearch/query_test.go @@ -0,0 +1,331 @@ +package elasticsearch + +import ( + "encoding/json" + "fmt" + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" + . "github.com/smartystreets/goconvey/convey" + "reflect" + "strconv" + "strings" + "testing" +) + +func testElasticSearchResponse(requestJSON string, expectedElasticSearchRequestJSON string) { + var queryExpectedJSONInterface, queryJSONInterface interface{} + parser := ElasticSearchQueryParser{} + model := &Query{} + + err := json.Unmarshal([]byte(requestJSON), model) + So(err, ShouldBeNil) + jsonDate, _ := simplejson.NewJson([]byte(`{"esVersion":2}`)) + dsInfo := &models.DataSource{ + Database: "grafana-test", + JsonData: jsonDate, + } + + testTimeRange := tsdb.NewTimeRange("5m", "now") + + req, _ := simplejson.NewJson([]byte(requestJSON)) + query, err := parser.Parse(req, dsInfo) + s, err := query.Build(&tsdb.TsdbQuery{TimeRange: testTimeRange}, dsInfo) + + queryJSON := strings.Split(s, "\n")[1] + err = json.Unmarshal([]byte(queryJSON), &queryJSONInterface) + So(err, ShouldBeNil) + + expectedElasticSearchRequestJSON = strings.Replace( + expectedElasticSearchRequestJSON, + "", + strconv.FormatInt(testTimeRange.GetFromAsMsEpoch(), 10), + -1, + ) + + expectedElasticSearchRequestJSON = strings.Replace( + expectedElasticSearchRequestJSON, + "", + strconv.FormatInt(testTimeRange.GetToAsMsEpoch(), 10), + -1, + ) + + err = json.Unmarshal([]byte(expectedElasticSearchRequestJSON), &queryExpectedJSONInterface) + So(err, ShouldBeNil) + + result := reflect.DeepEqual(queryExpectedJSONInterface, queryJSONInterface) + if !result { + fmt.Printf("ERROR: %s \n != \n %s", expectedElasticSearchRequestJSON, queryJSON) + } + So(result, ShouldBeTrue) +} +func TestElasticSearchQueryBuilder(t *testing.T) { + Convey("Elasticsearch QueryBuilder query testing", t, func() { + Convey("Build test average metric with moving average", func() { + var testElasticsearchModelRequestJSON = ` + { + "bucketAggs": [ + { + "field": "timestamp", + "id": "2", + "settings": { + "interval": "auto", + "min_doc_count": 0, + "trimEdges": 0 + }, + "type": "date_histogram" + } + ], + "dsType": "elasticsearch", + "metrics": [ + { + "field": "value", + "id": "1", + "inlineScript": "_value * 2", + "meta": {}, + "settings": { + "script": { + "inline": "_value * 2" + } + }, + "type": "avg" + }, + { + "field": "1", + "id": "3", + "meta": {}, + "pipelineAgg": "1", + "settings": { + "minimize": false, + "model": "simple", + "window": 5 + }, + "type": "moving_avg" + } + ], + "query": "(test:query) AND (name:sample)", + "refId": "A", + "timeField": "timestamp" + } + ` + + var expectedElasticsearchQueryJSON = ` + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "gte": "", + "lte": "", + "format": "epoch_millis" + } + } + }, + { + "query_string": { + "analyze_wildcard": true, + "query": "(test:query) AND (name:sample)" + } + } + ] + } + }, + "aggs": { + "2": { + "date_histogram": { + "interval": "200ms", + "field": "timestamp", + "min_doc_count": 0, + "extended_bounds": { + "min": "", + "max": "" + }, + "format": "epoch_millis" + }, + "aggs": { + "1": { + "avg": { + "field": "value", + "script": { + "inline": "_value * 2" + } + } + }, + "3": { + "moving_avg": { + "buckets_path": "1", + "window": 5, + "model": "simple", + "minimize": false + } + } + } + } + } + }` + + testElasticSearchResponse(testElasticsearchModelRequestJSON, expectedElasticsearchQueryJSON) + }) + Convey("Test Wildcards and Quotes", func() { + testElasticsearchModelRequestJSON := ` + { + "alias": "New", + "bucketAggs": [ + { + "field": "timestamp", + "id": "2", + "type": "date_histogram" + } + ], + "dsType": "elasticsearch", + "metrics": [ + { + "type": "sum", + "field": "value", + "id": "1" + } + ], + "query": "scope:$location.leagueconnect.api AND name:*CreateRegistration AND name:\"*.201-responses.rate\"", + "refId": "A", + "timeField": "timestamp" + }` + + expectedElasticsearchQueryJSON := ` + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "gte": "", + "lte": "", + "format": "epoch_millis" + } + } + }, + { + "query_string": { + "analyze_wildcard": true, + "query": "scope:$location.leagueconnect.api AND name:*CreateRegistration AND name:\"*.201-responses.rate\"" + } + } + ] + } + }, + "aggs": { + "2": { + "aggs": { + "1": { + "sum": { + "field": "value" + } + } + }, + "date_histogram": { + "extended_bounds": { + "max": "", + "min": "" + }, + "field": "timestamp", + "format": "epoch_millis", + "min_doc_count": 0 + } + } + } + }` + + testElasticSearchResponse(testElasticsearchModelRequestJSON, expectedElasticsearchQueryJSON) + }) + Convey("Test Term Aggregates", func() { + testElasticsearchModelRequestJSON := ` + { + "bucketAggs": [{ + "field": "name_raw", + "id": "4", + "settings": { + "order": "desc", + "orderBy": "_term", + "size": "10" + }, + "type": "terms" + }, { + "field": "timestamp", + "id": "2", + "settings": { + "interval": "1m", + "min_doc_count": 0, + "trimEdges": 0 + }, + "type": "date_histogram" + }], + "dsType": "elasticsearch", + "filters": [{ + "boolOp": "AND", + "not": false, + "type": "rfc190Scope", + "value": "*.hmp.metricsd" + }, { + "boolOp": "AND", + "not": false, + "type": "name_raw", + "value": "builtin.general.*_instance_count" + }], + "metricObject": {}, + "metrics": [{ + "field": "value", + "id": "1", + "meta": {}, + "options": {}, + "settings": {}, + "type": "sum" + }], + "mode": 0, + "numToGraph": 10, + "prependHostName": false, + "query": "(scope:*.hmp.metricsd) AND (name_raw:builtin.general.*_instance_count)", + "refId": "A", + "regexAlias": false, + "selectedApplication": "", + "selectedHost": "", + "selectedLocation": "", + "timeField": "timestamp", + "useFullHostName": "", + "useQuery": false + }` + + expectedElasticsearchQueryJSON := ` + { + "size": 0, + "query": { + "bool": { + "filter": [ + { + "range": { + "timestamp": { + "gte": "", + "lte": "", + "format": "epoch_millis" + } + } + }, + { + "query_string": { + "analyze_wildcard": true, + "query": "(scope:*.hmp.metricsd) AND (name_raw:builtin.general.*_instance_count)" + } + } + ] + } + }, + "aggs": {"4":{"aggs":{"2":{"aggs":{"1":{"sum":{"field":"value"}}},"date_histogram":{"extended_bounds":{"max":"","min":""},"field":"timestamp","format":"epoch_millis","interval":"1m","min_doc_count":0}}},"terms":{"field":"name_raw","order":{"_term":"desc"},"size":10}}} + }` + + testElasticSearchResponse(testElasticsearchModelRequestJSON, expectedElasticsearchQueryJSON) + }) + }) +} diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index a2a8565641f..01b8cb1d235 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -6,14 +6,14 @@ import ( "github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/tsdb" - "strconv" "regexp" + "strconv" "strings" ) type ElasticsearchResponseParser struct { Responses []Response - Targets []*QueryBuilder + Targets []*Query } func (rp *ElasticsearchResponseParser) getTimeSeries() *tsdb.QueryResult { @@ -29,7 +29,7 @@ func (rp *ElasticsearchResponseParser) getTimeSeries() *tsdb.QueryResult { return queryRes } -func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target *QueryBuilder, series *[]*tsdb.TimeSeries, props map[string]string, depth int) (error) { +func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{}, target *Query, series *[]*tsdb.TimeSeries, props map[string]string, depth int) error { var err error maxDepth := len(target.BucketAggs) - 1 for aggId, v := range aggs { @@ -71,7 +71,7 @@ func (rp *ElasticsearchResponseParser) processBuckets(aggs map[string]interface{ } -func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target *QueryBuilder, series *[]*tsdb.TimeSeries, props map[string]string) (error) { +func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, target *Query, series *[]*tsdb.TimeSeries, props map[string]string) error { for _, v := range target.Metrics { metric := simplejson.NewFromAny(v) if metric.Get("hide").MustBool(false) { @@ -143,7 +143,7 @@ func (rp *ElasticsearchResponseParser) processMetrics(esAgg *simplejson.Json, ta return nil } -func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries, target *QueryBuilder) { +func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries, target *Query) { set := make(map[string]string) for _, v := range *seriesList { if metricType, exists := v.Tags["metric"]; exists { @@ -159,8 +159,9 @@ func (rp *ElasticsearchResponseParser) nameSeries(seriesList *[]*tsdb.TimeSeries } -func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, target *QueryBuilder, metricTypeCount int) (string) { - metricName := rp.getMetricName(series.Tags["metric"]) +func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, target *Query, metricTypeCount int) string { + metricType := series.Tags["metric"] + metricName := rp.getMetricName(metricType) delete(series.Tags, "metric") field := "" @@ -172,7 +173,7 @@ func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, ta if target.Alias != "" { var re = regexp.MustCompile(`{{([\s\S]+?)}}`) for _, match := range re.FindAllString(target.Alias, -1) { - group := match[2:len(match)-2] + group := match[2 : len(match)-2] if strings.HasPrefix(group, "term ") { if term, ok := series.Tags["term "]; ok { @@ -193,7 +194,20 @@ func (rp *ElasticsearchResponseParser) getSeriesName(series *tsdb.TimeSeries, ta } } // todo, if field and pipelineAgg - if field != "" { + if field != "" && isPipelineAgg(metricType) { + found := false + for _, targetMetricI := range target.Metrics { + targetMetric := simplejson.NewFromAny(targetMetricI) + if targetMetric.Get("id").MustString() == field { + metricName += " " + describeMetric(targetMetric.Get("type").MustString(), field) + found = true + } + } + if !found { + metricName = "Unset" + } + + } else if field != "" { metricName += " " + field } @@ -241,7 +255,7 @@ func castToNullFloat(j *simplejson.Json) null.Float { return null.NewFloat(0, false) } -func findAgg(target *QueryBuilder, aggId string) (*simplejson.Json, error) { +func findAgg(target *Query, aggId string) (*simplejson.Json, error) { for _, v := range target.BucketAggs { aggDef := simplejson.NewFromAny(v) if aggId == aggDef.Get("id").MustString() {