diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index 0ce9eca0972..abf25feac06 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -1,27 +1,20 @@ package elasticsearch import ( - "bytes" "context" - "encoding/json" - "errors" "fmt" - "github.com/grafana/grafana/pkg/log" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/tsdb" - "golang.org/x/net/context/ctxhttp" "net/http" "net/url" "path" "strings" "time" + + "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" ) -type ElasticsearchExecutor struct { - QueryParser *ElasticSearchQueryParser - Transport *http.Transport -} +type ElasticsearchExecutor struct{} var ( glog log.Logger @@ -29,14 +22,7 @@ var ( ) func NewElasticsearchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { - transport, err := dsInfo.GetHttpTransport() - if err != nil { - return nil, err - } - - return &ElasticsearchExecutor{ - Transport: transport, - }, nil + return &ElasticsearchExecutor{}, nil } func init() { @@ -46,84 +32,11 @@ func init() { } func (e *ElasticsearchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { - result := &tsdb.Response{} - result.Results = make(map[string]*tsdb.QueryResult) - - queries, err := e.getQuery(dsInfo, tsdbQuery) - if err != nil { - return nil, err + if len(tsdbQuery.Queries) == 0 { + return nil, fmt.Errorf("query contains no queries") } - buff := bytes.Buffer{} - for _, q := range queries { - s, err := q.Build(tsdbQuery, dsInfo) - if err != nil { - return nil, err - } - buff.WriteString(s) - } - payload := buff.String() - - if setting.Env == setting.DEV { - glog.Debug("Elasticsearch playload", "raw playload", payload) - } - glog.Info("Elasticsearch playload", "raw playload", payload) - - req, err := e.createRequest(dsInfo, payload) - if err != nil { - return nil, err - } - - httpClient, err := dsInfo.GetHttpClient() - if err != nil { - return nil, err - } - - resp, err := ctxhttp.Do(ctx, httpClient, req) - if err != nil { - return nil, err - } - - if resp.StatusCode/100 != 2 { - return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status) - } - - var responses Responses - dec := json.NewDecoder(resp.Body) - defer resp.Body.Close() - dec.UseNumber() - err = dec.Decode(&responses) - if err != nil { - return nil, err - } - - for _, res := range responses.Responses { - if res.Err != nil { - return nil, errors.New(res.getErrMsg()) - } - } - responseParser := ElasticsearchResponseParser{responses.Responses, queries} - queryRes := responseParser.getTimeSeries() - result.Results["A"] = queryRes - return result, nil -} - -func (e *ElasticsearchExecutor) getQuery(dsInfo *models.DataSource, context *tsdb.TsdbQuery) ([]*Query, error) { - queries := make([]*Query, 0) - if len(context.Queries) == 0 { - return nil, fmt.Errorf("query request contains no queries") - } - for _, v := range context.Queries { - - query, err := e.QueryParser.Parse(v.Model, dsInfo) - if err != nil { - return nil, err - } - queries = append(queries, query) - - } - return queries, nil - + return e.executeTimeSeriesQuery(ctx, dsInfo, tsdbQuery) } func (e *ElasticsearchExecutor) createRequest(dsInfo *models.DataSource, query string) (*http.Request, error) { diff --git a/pkg/tsdb/elasticsearch/model_parser.go b/pkg/tsdb/elasticsearch/model_parser.go deleted file mode 100644 index 5d94aebef1a..00000000000 --- a/pkg/tsdb/elasticsearch/model_parser.go +++ /dev/null @@ -1,153 +0,0 @@ -package elasticsearch - -import ( - "fmt" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/tsdb" - "github.com/leibowitz/moment" - "strings" - "time" -) - -type ElasticSearchQueryParser struct { -} - -func (qp *ElasticSearchQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource) (*Query, error) { - //payload := bytes.Buffer{} - //queryHeader := qp.getQueryHeader() - timeField, err := model.Get("timeField").String() - if err != nil { - return nil, err - } - rawQuery := model.Get("query").MustString() - bucketAggs, err := qp.parseBucketAggs(model) - if err != nil { - return nil, err - } - metrics, err := qp.parseMetrics(model) - if err != nil { - return nil, err - } - alias := model.Get("alias").MustString("") - parsedInterval, err := tsdb.GetIntervalFrom(dsInfo, model, time.Millisecond) - if err != nil { - return nil, err - } - - return &Query{timeField, - rawQuery, - bucketAggs, - metrics, - alias, - parsedInterval}, nil -} - -func (qp *ElasticSearchQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) { - var err error - var result []*BucketAgg - for _, t := range model.Get("bucketAggs").MustArray() { - aggJson := simplejson.NewFromAny(t) - agg := &BucketAgg{} - - agg.Type, err = aggJson.Get("type").String() - if err != nil { - return nil, err - } - - agg.ID, err = aggJson.Get("id").String() - if err != nil { - return nil, err - } - - agg.Field = aggJson.Get("field").MustString() - agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap()) - - result = append(result, agg) - } - return result, nil -} - -func (qp *ElasticSearchQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) { - var err error - var result []*Metric - for _, t := range model.Get("metrics").MustArray() { - metricJson := simplejson.NewFromAny(t) - metric := &Metric{} - - metric.Field = metricJson.Get("field").MustString() - metric.Hide = metricJson.Get("hide").MustBool(false) - metric.ID, err = metricJson.Get("id").String() - if err != nil { - return nil, err - } - - metric.PipelineAggregate = metricJson.Get("pipelineAgg").MustString() - metric.Settings = simplejson.NewFromAny(metricJson.Get("settings").MustMap()) - - metric.Type, err = metricJson.Get("type").String() - if err != nil { - return nil, err - } - - result = append(result, metric) - } - return result, nil -} -func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader { - var header QueryHeader - esVersion := dsInfo.JsonData.Get("esVersion").MustInt() - - searchType := "query_then_fetch" - if esVersion < 5 { - searchType = "count" - } - header.SearchType = searchType - header.IgnoreUnavailable = true - header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange) - - if esVersion >= 56 { - header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt() - } - return &header -} - -func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string { - if interval == "" { - return pattern - } - - var indexes []string - indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]") - indexBase := indexParts[0] - if len(indexParts) <= 1 { - return pattern - } - - indexDateFormat := indexParts[1] - - start := moment.NewMoment(timeRange.MustGetFrom()) - end := moment.NewMoment(timeRange.MustGetTo()) - - indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat))) - for start.IsBefore(*end) { - switch interval { - case "Hourly": - start = start.AddHours(1) - - case "Daily": - start = start.AddDay() - - case "Weekly": - start = start.AddWeeks(1) - - case "Monthly": - start = start.AddMonths(1) - - case "Yearly": - start = start.AddYears(1) - } - indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat))) - } - return strings.Join(indexes, ",") -} diff --git a/pkg/tsdb/elasticsearch/model_parser_test.go b/pkg/tsdb/elasticsearch/model_parser_test.go deleted file mode 100644 index aa7336fb69b..00000000000 --- a/pkg/tsdb/elasticsearch/model_parser_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package elasticsearch - -import ( - "github.com/grafana/grafana/pkg/tsdb" - . "github.com/smartystreets/goconvey/convey" - "strconv" - "strings" - "testing" -) - -func makeTime(hour int) string { - //unixtime 1500000000 == 2017-07-14T02:40:00+00:00 - return strconv.Itoa((1500000000 + hour*60*60) * 1000) -} - -func getIndexListByTime(pattern string, interval string, hour int) string { - timeRange := &tsdb.TimeRange{ - From: makeTime(0), - To: makeTime(hour), - } - return getIndexList(pattern, interval, timeRange) -} - -func TestElasticsearchGetIndexList(t *testing.T) { - Convey("Test Elasticsearch getIndex ", t, func() { - - Convey("Parse Interval Formats", func() { - So(getIndexListByTime("[logstash-]YYYY.MM.DD", "Daily", 48), - ShouldEqual, "logstash-2017.07.14,logstash-2017.07.15,logstash-2017.07.16") - - So(len(strings.Split(getIndexListByTime("[logstash-]YYYY.MM.DD.HH", "Hourly", 3), ",")), - ShouldEqual, 4) - - So(getIndexListByTime("[logstash-]YYYY.W", "Weekly", 100), - ShouldEqual, "logstash-2017.28,logstash-2017.29") - - So(getIndexListByTime("[logstash-]YYYY.MM", "Monthly", 700), - ShouldEqual, "logstash-2017.07,logstash-2017.08") - - So(getIndexListByTime("[logstash-]YYYY", "Yearly", 10000), - ShouldEqual, "logstash-2017,logstash-2018,logstash-2019") - }) - - Convey("No Interval", func() { - index := getIndexListByTime("logstash-test", "", 1) - So(index, ShouldEqual, "logstash-test") - }) - }) -} diff --git a/pkg/tsdb/elasticsearch/query.go b/pkg/tsdb/elasticsearch/query.go index a63529df2df..123f8dd5667 100644 --- a/pkg/tsdb/elasticsearch/query.go +++ b/pkg/tsdb/elasticsearch/query.go @@ -5,12 +5,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/tsdb" "strconv" "strings" "time" + + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" + "github.com/leibowitz/moment" ) var rangeFilterSetting = RangeFilterSetting{Gte: "$timeFrom", @@ -22,14 +24,12 @@ type Query struct { RawQuery string `json:"query"` BucketAggs []*BucketAgg `json:"bucketAggs"` Metrics []*Metric `json:"metrics"` - Alias string `json:"Alias"` + Alias string `json:"alias"` Interval time.Duration } func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) (string, error) { var req Request - payload := bytes.Buffer{} - req.Size = 0 q.renderReqQuery(&req) @@ -45,6 +45,7 @@ func (q *Query) Build(queryContext *tsdb.TsdbQuery, dsInfo *models.DataSource) ( reqBytes, err := json.Marshal(req) reqHeader := getRequestHeader(queryContext.TimeRange, dsInfo) + payload := bytes.Buffer{} payload.WriteString(reqHeader.String() + "\n") payload.WriteString(string(reqBytes) + "\n") return q.renderTemplate(payload.String(), queryContext) @@ -235,3 +236,61 @@ func (q *Query) renderTemplate(payload string, queryContext *tsdb.TsdbQuery) (st payload = strings.Replace(payload, "$__interval", interval.Text, -1) return payload, nil } + +func getRequestHeader(timeRange *tsdb.TimeRange, dsInfo *models.DataSource) *QueryHeader { + var header QueryHeader + esVersion := dsInfo.JsonData.Get("esVersion").MustInt() + + searchType := "query_then_fetch" + if esVersion < 5 { + searchType = "count" + } + header.SearchType = searchType + header.IgnoreUnavailable = true + header.Index = getIndexList(dsInfo.Database, dsInfo.JsonData.Get("interval").MustString(), timeRange) + + if esVersion >= 56 { + header.MaxConcurrentShardRequests = dsInfo.JsonData.Get("maxConcurrentShardRequests").MustInt() + } + return &header +} + +func getIndexList(pattern string, interval string, timeRange *tsdb.TimeRange) string { + if interval == "" { + return pattern + } + + var indexes []string + indexParts := strings.Split(strings.TrimLeft(pattern, "["), "]") + indexBase := indexParts[0] + if len(indexParts) <= 1 { + return pattern + } + + indexDateFormat := indexParts[1] + + start := moment.NewMoment(timeRange.MustGetFrom()) + end := moment.NewMoment(timeRange.MustGetTo()) + + indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat))) + for start.IsBefore(*end) { + switch interval { + case "Hourly": + start = start.AddHours(1) + + case "Daily": + start = start.AddDay() + + case "Weekly": + start = start.AddWeeks(1) + + case "Monthly": + start = start.AddMonths(1) + + case "Yearly": + start = start.AddYears(1) + } + indexes = append(indexes, fmt.Sprintf("%s%s", indexBase, start.Format(indexDateFormat))) + } + return strings.Join(indexes, ",") +} diff --git a/pkg/tsdb/elasticsearch/query_test.go b/pkg/tsdb/elasticsearch/query_test.go index aecca9f4734..1ce6e5ac7bb 100644 --- a/pkg/tsdb/elasticsearch/query_test.go +++ b/pkg/tsdb/elasticsearch/query_test.go @@ -3,14 +3,15 @@ package elasticsearch import ( "encoding/json" "fmt" - "github.com/grafana/grafana/pkg/components/simplejson" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/tsdb" - . "github.com/smartystreets/goconvey/convey" "reflect" "strconv" "strings" "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" + . "github.com/smartystreets/goconvey/convey" ) func testElasticSearchResponse(query Query, expectedElasticSearchRequestJSON string) { @@ -254,3 +255,43 @@ func TestElasticSearchQueryBuilder(t *testing.T) { }) }) } + +func makeTime(hour int) string { + //unixtime 1500000000 == 2017-07-14T02:40:00+00:00 + return strconv.Itoa((1500000000 + hour*60*60) * 1000) +} + +func getIndexListByTime(pattern string, interval string, hour int) string { + timeRange := &tsdb.TimeRange{ + From: makeTime(0), + To: makeTime(hour), + } + return getIndexList(pattern, interval, timeRange) +} + +func TestElasticsearchGetIndexList(t *testing.T) { + Convey("Test Elasticsearch getIndex ", t, func() { + + Convey("Parse Interval Formats", func() { + So(getIndexListByTime("[logstash-]YYYY.MM.DD", "Daily", 48), + ShouldEqual, "logstash-2017.07.14,logstash-2017.07.15,logstash-2017.07.16") + + So(len(strings.Split(getIndexListByTime("[logstash-]YYYY.MM.DD.HH", "Hourly", 3), ",")), + ShouldEqual, 4) + + So(getIndexListByTime("[logstash-]YYYY.W", "Weekly", 100), + ShouldEqual, "logstash-2017.28,logstash-2017.29") + + So(getIndexListByTime("[logstash-]YYYY.MM", "Monthly", 700), + ShouldEqual, "logstash-2017.07,logstash-2017.08") + + So(getIndexListByTime("[logstash-]YYYY", "Yearly", 10000), + ShouldEqual, "logstash-2017,logstash-2018,logstash-2019") + }) + + Convey("No Interval", func() { + index := getIndexListByTime("logstash-test", "", 1) + So(index, ShouldEqual, "logstash-test") + }) + }) +} diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index c5b877c1925..1df2c4551ae 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -2,9 +2,10 @@ package elasticsearch import ( "encoding/json" + "testing" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" - "testing" ) func testElasticsearchResponse(body string, target Query) *tsdb.QueryResult { diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go new file mode 100644 index 00000000000..af8f61eb144 --- /dev/null +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -0,0 +1,182 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/tsdb" + "golang.org/x/net/context/ctxhttp" +) + +type timeSeriesQuery struct { + queries []*Query +} + +func (e *ElasticsearchExecutor) executeTimeSeriesQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery *tsdb.TsdbQuery) (*tsdb.Response, error) { + result := &tsdb.Response{} + result.Results = make(map[string]*tsdb.QueryResult) + + tsQueryParser := newTimeSeriesQueryParser(dsInfo) + query, err := tsQueryParser.parse(tsdbQuery) + if err != nil { + return nil, err + } + + buff := bytes.Buffer{} + for _, q := range query.queries { + s, err := q.Build(tsdbQuery, dsInfo) + if err != nil { + return nil, err + } + buff.WriteString(s) + } + payload := buff.String() + + if setting.Env == setting.DEV { + glog.Debug("Elasticsearch playload", "raw playload", payload) + } + glog.Info("Elasticsearch playload", "raw playload", payload) + + req, err := e.createRequest(dsInfo, payload) + if err != nil { + return nil, err + } + + httpClient, err := dsInfo.GetHttpClient() + if err != nil { + return nil, err + } + + resp, err := ctxhttp.Do(ctx, httpClient, req) + if err != nil { + return nil, err + } + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("elasticsearch returned statuscode invalid status code: %v", resp.Status) + } + + var responses Responses + defer resp.Body.Close() + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + err = dec.Decode(&responses) + if err != nil { + return nil, err + } + + for _, res := range responses.Responses { + if res.Err != nil { + return nil, errors.New(res.getErrMsg()) + } + } + responseParser := ElasticsearchResponseParser{responses.Responses, query.queries} + queryRes := responseParser.getTimeSeries() + result.Results["A"] = queryRes + return result, nil +} + +type timeSeriesQueryParser struct { + ds *models.DataSource +} + +func newTimeSeriesQueryParser(ds *models.DataSource) *timeSeriesQueryParser { + return &timeSeriesQueryParser{ + ds: ds, + } +} + +func (p *timeSeriesQueryParser) parse(tsdbQuery *tsdb.TsdbQuery) (*timeSeriesQuery, error) { + queries := make([]*Query, 0) + for _, q := range tsdbQuery.Queries { + model := q.Model + timeField, err := model.Get("timeField").String() + if err != nil { + return nil, err + } + rawQuery := model.Get("query").MustString() + bucketAggs, err := p.parseBucketAggs(model) + if err != nil { + return nil, err + } + metrics, err := p.parseMetrics(model) + if err != nil { + return nil, err + } + alias := model.Get("alias").MustString("") + parsedInterval, err := tsdb.GetIntervalFrom(p.ds, model, time.Millisecond) + if err != nil { + return nil, err + } + + queries = append(queries, &Query{ + TimeField: timeField, + RawQuery: rawQuery, + BucketAggs: bucketAggs, + Metrics: metrics, + Alias: alias, + Interval: parsedInterval, + }) + } + + return &timeSeriesQuery{queries: queries}, nil +} + +func (p *timeSeriesQueryParser) parseBucketAggs(model *simplejson.Json) ([]*BucketAgg, error) { + var err error + var result []*BucketAgg + for _, t := range model.Get("bucketAggs").MustArray() { + aggJson := simplejson.NewFromAny(t) + agg := &BucketAgg{} + + agg.Type, err = aggJson.Get("type").String() + if err != nil { + return nil, err + } + + agg.ID, err = aggJson.Get("id").String() + if err != nil { + return nil, err + } + + agg.Field = aggJson.Get("field").MustString() + agg.Settings = simplejson.NewFromAny(aggJson.Get("settings").MustMap()) + + result = append(result, agg) + } + return result, nil +} + +func (p *timeSeriesQueryParser) parseMetrics(model *simplejson.Json) ([]*Metric, error) { + var err error + var result []*Metric + for _, t := range model.Get("metrics").MustArray() { + metricJSON := simplejson.NewFromAny(t) + metric := &Metric{} + + metric.Field = metricJSON.Get("field").MustString() + metric.Hide = metricJSON.Get("hide").MustBool(false) + metric.ID, err = metricJSON.Get("id").String() + if err != nil { + return nil, err + } + + metric.PipelineAggregate = metricJSON.Get("pipelineAgg").MustString() + metric.Settings = simplejson.NewFromAny(metricJSON.Get("settings").MustMap()) + + metric.Type, err = metricJSON.Get("type").String() + if err != nil { + return nil, err + } + + result = append(result, metric) + } + return result, nil +} diff --git a/pkg/tsdb/elasticsearch/time_series_query_test.go b/pkg/tsdb/elasticsearch/time_series_query_test.go new file mode 100644 index 00000000000..4950bc811de --- /dev/null +++ b/pkg/tsdb/elasticsearch/time_series_query_test.go @@ -0,0 +1,118 @@ +package elasticsearch + +import ( + "testing" + + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/tsdb" + . "github.com/smartystreets/goconvey/convey" +) + +func TestTimeSeriesQueryParser(t *testing.T) { + Convey("Test time series query parser", t, func() { + ds := &models.DataSource{} + p := newTimeSeriesQueryParser(ds) + + Convey("Should be able to parse query", func() { + json, err := simplejson.NewJson([]byte(`{ + "timeField": "@timestamp", + "query": "@metric:cpu", + "alias": "{{@hostname}} {{metric}}", + "metrics": [ + { + "field": "@value", + "id": "1", + "meta": {}, + "settings": { + "percents": [ + "90" + ] + }, + "type": "percentiles" + }, + { + "type": "count", + "field": "select field", + "id": "4", + "settings": {}, + "meta": {} + } + ], + "bucketAggs": [ + { + "fake": true, + "field": "@hostname", + "id": "3", + "settings": { + "min_doc_count": 1, + "order": "desc", + "orderBy": "_term", + "size": "10" + }, + "type": "terms" + }, + { + "field": "@timestamp", + "id": "2", + "settings": { + "interval": "5m", + "min_doc_count": 0, + "trimEdges": 0 + }, + "type": "date_histogram" + } + ] + }`)) + So(err, ShouldBeNil) + tsdbQuery := &tsdb.TsdbQuery{ + Queries: []*tsdb.Query{ + { + DataSource: ds, + Model: json, + }, + }, + } + tsQuery, err := p.parse(tsdbQuery) + So(err, ShouldBeNil) + So(tsQuery.queries, ShouldHaveLength, 1) + + q := tsQuery.queries[0] + + So(q.TimeField, ShouldEqual, "@timestamp") + So(q.RawQuery, ShouldEqual, "@metric:cpu") + So(q.Alias, ShouldEqual, "{{@hostname}} {{metric}}") + + So(q.Metrics, ShouldHaveLength, 2) + So(q.Metrics[0].Field, ShouldEqual, "@value") + So(q.Metrics[0].ID, ShouldEqual, "1") + So(q.Metrics[0].Type, ShouldEqual, "percentiles") + So(q.Metrics[0].Hide, ShouldBeFalse) + So(q.Metrics[0].PipelineAggregate, ShouldEqual, "") + So(q.Metrics[0].Settings.Get("percents").MustStringArray()[0], ShouldEqual, "90") + + So(q.Metrics[1].Field, ShouldEqual, "select field") + So(q.Metrics[1].ID, ShouldEqual, "4") + So(q.Metrics[1].Type, ShouldEqual, "count") + So(q.Metrics[1].Hide, ShouldBeFalse) + So(q.Metrics[1].PipelineAggregate, ShouldEqual, "") + So(q.Metrics[1].Settings.MustMap(), ShouldBeEmpty) + + So(q.BucketAggs, ShouldHaveLength, 2) + So(q.BucketAggs[0].Field, ShouldEqual, "@hostname") + So(q.BucketAggs[0].ID, ShouldEqual, "3") + So(q.BucketAggs[0].Type, ShouldEqual, "terms") + So(q.BucketAggs[0].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 1) + So(q.BucketAggs[0].Settings.Get("order").MustString(), ShouldEqual, "desc") + So(q.BucketAggs[0].Settings.Get("orderBy").MustString(), ShouldEqual, "_term") + So(q.BucketAggs[0].Settings.Get("size").MustString(), ShouldEqual, "10") + + So(q.BucketAggs[1].Field, ShouldEqual, "@timestamp") + So(q.BucketAggs[1].ID, ShouldEqual, "2") + So(q.BucketAggs[1].Type, ShouldEqual, "date_histogram") + So(q.BucketAggs[1].Settings.Get("interval").MustString(), ShouldEqual, "5m") + So(q.BucketAggs[1].Settings.Get("min_doc_count").MustInt64(), ShouldEqual, 0) + So(q.BucketAggs[1].Settings.Get("trimEdges").MustInt64(), ShouldEqual, 0) + }) + }) +}