diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index 3a2d31b42c6..8ae45599484 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "path" + "strconv" "strings" "time" @@ -25,6 +26,10 @@ var ( clientLog = log.New(loggerName) ) +var newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) { + return ds.GetHttpClient() +} + // Client represents a client which can interact with elasticsearch api type Client interface { GetVersion() int @@ -57,23 +62,18 @@ var NewClient = func(ctx context.Context, ds *models.DataSource, timeRange *tsdb return nil, err } - bc := &baseClientImpl{ - ctx: ctx, - ds: ds, - version: version, - timeField: timeField, - indices: indices, - } - clientLog.Debug("Creating new client", "version", version, "timeField", timeField, "indices", strings.Join(indices, ", ")) switch version { - case 2: - return newV2Client(bc) - case 5: - return newV5Client(bc) - case 56: - return newV56Client(bc) + case 2, 5, 56: + return &baseClientImpl{ + ctx: ctx, + ds: ds, + version: version, + timeField: timeField, + indices: indices, + timeRange: timeRange, + }, nil } return nil, fmt.Errorf("elasticsearch version=%d is not supported", version) @@ -93,6 +93,7 @@ type baseClientImpl struct { version int timeField string indices []string + timeRange *tsdb.TimeRange } func (c *baseClientImpl) GetVersion() int { @@ -114,11 +115,20 @@ func (c *baseClientImpl) getSettings() *simplejson.Json { } type multiRequest struct { - header map[string]interface{} - body interface{} + header map[string]interface{} + body interface{} + interval tsdb.Interval } func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) { + bytes, err := c.encodeBatchRequests(requests) + if err != nil { + return nil, err + } + return c.executeRequest(http.MethodPost, uriPath, bytes) +} + +func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) { clientLog.Debug("Encoding batch requests to json", "batch requests", len(requests)) start := time.Now() @@ -134,13 +144,18 @@ func (c *baseClientImpl) executeBatchRequest(uriPath string, requests []*multiRe if err != nil { return nil, err } - payload.WriteString(string(reqBody) + "\n") + + body := string(reqBody) + body = strings.Replace(body, "$__interval_ms", strconv.FormatInt(r.interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1) + body = strings.Replace(body, "$__interval", r.interval.Text, -1) + + payload.WriteString(body + "\n") } elapsed := time.Now().Sub(start) clientLog.Debug("Encoded batch requests to json", "took", elapsed) - return c.executeRequest(http.MethodPost, uriPath, payload.Bytes()) + return payload.Bytes(), nil } func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*http.Response, error) { @@ -173,7 +188,7 @@ func (c *baseClientImpl) executeRequest(method, uriPath string, body []byte) (*h req.SetBasicAuth(c.ds.User, c.ds.Password) } - httpClient, err := c.ds.GetHttpClient() + httpClient, err := newDatasourceHttpClient(c.ds) if err != nil { return nil, err } @@ -220,77 +235,26 @@ func (c *baseClientImpl) createMultiSearchRequests(searchRequests []*SearchReque multiRequests := []*multiRequest{} for _, searchReq := range searchRequests { - multiRequests = append(multiRequests, &multiRequest{ + mr := multiRequest{ header: map[string]interface{}{ "search_type": "query_then_fetch", "ignore_unavailable": true, "index": strings.Join(c.indices, ","), }, - body: searchReq, - }) - } + body: searchReq, + interval: searchReq.Interval, + } - return multiRequests -} + if c.version == 2 { + mr.header["search_type"] = "count" + } -type v2Client struct { - baseClient -} + if c.version >= 56 { + maxConcurrentShardRequests := c.getSettings().Get("maxConcurrentShardRequests").MustInt(256) + mr.header["max_concurrent_shard_requests"] = maxConcurrentShardRequests + } -func newV2Client(bc baseClient) (*v2Client, error) { - c := v2Client{ - baseClient: bc, - } - - return &c, nil -} - -func (c *v2Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { - multiRequests := c.baseClient.createMultiSearchRequests(searchRequests) - - for _, mr := range multiRequests { - mr.header["search_type"] = "count" - } - - return multiRequests -} - -type v5Client struct { - baseClient -} - -func newV5Client(bc baseClient) (*v5Client, error) { - c := v5Client{ - baseClient: bc, - } - - return &c, nil -} - -type v56Client struct { - *v5Client - maxConcurrentShardRequests int -} - -func newV56Client(bc baseClient) (*v56Client, error) { - v5Client := v5Client{ - baseClient: bc, - } - maxConcurrentShardRequests := bc.getSettings().Get("maxConcurrentShardRequests").MustInt(256) - - c := v56Client{ - v5Client: &v5Client, - maxConcurrentShardRequests: maxConcurrentShardRequests, - } - - return &c, nil -} - -func (c *v56Client) createMultiSearchRequests(searchRequests []*SearchRequest) []*multiRequest { - multiRequests := c.v5Client.createMultiSearchRequests(searchRequests) - - for _, mr := range multiRequests { - mr.header["max_concurrent_shard_requests"] = c.maxConcurrentShardRequests + multiRequests = append(multiRequests, &mr) } return multiRequests diff --git a/pkg/tsdb/elasticsearch/client/client_test.go b/pkg/tsdb/elasticsearch/client/client_test.go index d557ceb28b1..11d1cdb1d71 100644 --- a/pkg/tsdb/elasticsearch/client/client_test.go +++ b/pkg/tsdb/elasticsearch/client/client_test.go @@ -1,10 +1,17 @@ package es import ( + "bytes" + "context" + "fmt" + "io/ioutil" "net/http" + "net/http/httptest" "testing" + "time" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/tsdb" "github.com/grafana/grafana/pkg/models" . "github.com/smartystreets/goconvey/convey" @@ -85,131 +92,213 @@ func TestClient(t *testing.T) { }) }) - Convey("v2", func() { - ds := &models.DataSource{ - JsonData: simplejson.NewFromAny(map[string]interface{}{ - "esVersion": 2, - }), + Convey("Given a fake http client", func() { + var responseBuffer *bytes.Buffer + var req *http.Request + ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + req = r + buf, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Fatalf("Failed to read response body, err=%v", err) + } + responseBuffer = bytes.NewBuffer(buf) + })) + + currentNewDatasourceHttpClient := newDatasourceHttpClient + + newDatasourceHttpClient = func(ds *models.DataSource) (*http.Client, error) { + return ts.Client(), nil } - c, err := newV2Client(newFakeBaseClient(ds, []string{"test-*"})) - So(err, ShouldBeNil) - So(c, ShouldNotBeNil) + from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC) + to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC) + fromStr := fmt.Sprintf("%d", from.UnixNano()/int64(time.Millisecond)) + toStr := fmt.Sprintf("%d", to.UnixNano()/int64(time.Millisecond)) + timeRange := tsdb.NewTimeRange(fromStr, toStr) - Convey("When creating multisearch requests should have correct headers", func() { - multiRequests := c.createMultiSearchRequests([]*SearchRequest{ - {Index: "test-*"}, - }) - So(multiRequests, ShouldHaveLength, 1) - header := multiRequests[0].header - So(header, ShouldHaveLength, 3) - So(header["index"], ShouldEqual, "test-*") - So(header["ignore_unavailable"], ShouldEqual, true) - So(header["search_type"], ShouldEqual, "count") - }) - }) - - Convey("v5", func() { - ds := &models.DataSource{ - JsonData: simplejson.NewFromAny(map[string]interface{}{ - "esVersion": 5, - }), - } - - c, err := newV5Client(newFakeBaseClient(ds, []string{"test-*"})) - So(err, ShouldBeNil) - So(c, ShouldNotBeNil) - - Convey("When creating multisearch requests should have correct headers", func() { - multiRequests := c.createMultiSearchRequests([]*SearchRequest{ - {Index: "test-*"}, - }) - So(multiRequests, ShouldHaveLength, 1) - header := multiRequests[0].header - So(header, ShouldHaveLength, 3) - So(header["index"], ShouldEqual, "test-*") - So(header["ignore_unavailable"], ShouldEqual, true) - So(header["search_type"], ShouldEqual, "query_then_fetch") - }) - }) - - Convey("v5.6", func() { - Convey("With default settings", func() { + Convey("and a v2.x client", func() { ds := models.DataSource{ + Database: "[metrics-]YYYY.MM.DD", + Url: ts.URL, JsonData: simplejson.NewFromAny(map[string]interface{}{ - "esVersion": 56, + "esVersion": 2, + "timeField": "@timestamp", + "interval": "Daily", }), } - c, err := newV56Client(newFakeBaseClient(&ds, []string{"test-*"})) + c, err := NewClient(context.Background(), &ds, timeRange) So(err, ShouldBeNil) So(c, ShouldNotBeNil) - Convey("When creating multisearch requests should have correct headers", func() { - multiRequests := c.createMultiSearchRequests([]*SearchRequest{ - {Index: "test-*"}, + Convey("When executing multi search", func() { + ms, err := createMultisearchForTest(c) + So(err, ShouldBeNil) + c.ExecuteMultisearch(ms) + + Convey("Should send correct request and payload", func() { + So(req, ShouldNotBeNil) + So(req.Method, ShouldEqual, http.MethodPost) + So(req.URL.Path, ShouldEqual, "/_msearch") + + So(responseBuffer, ShouldNotBeNil) + + headerBytes, err := responseBuffer.ReadBytes('\n') + So(err, ShouldBeNil) + bodyBytes := responseBuffer.Bytes() + + jHeader, err := simplejson.NewJson(headerBytes) + So(err, ShouldBeNil) + + jBody, err := simplejson.NewJson(bodyBytes) + So(err, ShouldBeNil) + + fmt.Println("body", string(headerBytes)) + + So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15") + So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true) + So(jHeader.Get("search_type").MustString(), ShouldEqual, "count") + So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10) + + Convey("and replace $__interval variable", func() { + So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname") + }) + + Convey("and replace $__interval_ms variable", func() { + So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s") + }) }) - So(multiRequests, ShouldHaveLength, 1) - header := multiRequests[0].header - So(header, ShouldHaveLength, 4) - So(header["index"], ShouldEqual, "test-*") - So(header["ignore_unavailable"], ShouldEqual, true) - So(header["search_type"], ShouldEqual, "query_then_fetch") - So(header["max_concurrent_shard_requests"], ShouldEqual, 256) }) }) - Convey("With custom settings", func() { + Convey("and a v5.x client", func() { ds := models.DataSource{ + Database: "[metrics-]YYYY.MM.DD", + Url: ts.URL, + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "esVersion": 5, + "maxConcurrentShardRequests": 100, + "timeField": "@timestamp", + "interval": "Daily", + }), + } + + c, err := NewClient(context.Background(), &ds, timeRange) + So(err, ShouldBeNil) + So(c, ShouldNotBeNil) + + Convey("When executing multi search", func() { + ms, err := createMultisearchForTest(c) + So(err, ShouldBeNil) + c.ExecuteMultisearch(ms) + + Convey("Should send correct request and payload", func() { + So(req, ShouldNotBeNil) + So(req.Method, ShouldEqual, http.MethodPost) + So(req.URL.Path, ShouldEqual, "/_msearch") + + So(responseBuffer, ShouldNotBeNil) + + headerBytes, err := responseBuffer.ReadBytes('\n') + So(err, ShouldBeNil) + bodyBytes := responseBuffer.Bytes() + + jHeader, err := simplejson.NewJson(headerBytes) + So(err, ShouldBeNil) + + jBody, err := simplejson.NewJson(bodyBytes) + So(err, ShouldBeNil) + + fmt.Println("body", string(headerBytes)) + + So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15") + So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true) + So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch") + So(jHeader.Get("max_concurrent_shard_requests").MustInt(10), ShouldEqual, 10) + + Convey("and replace $__interval variable", func() { + So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname") + }) + + Convey("and replace $__interval_ms variable", func() { + So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s") + }) + }) + }) + }) + + Convey("and a v5.6 client", func() { + ds := models.DataSource{ + Database: "[metrics-]YYYY.MM.DD", + Url: ts.URL, JsonData: simplejson.NewFromAny(map[string]interface{}{ "esVersion": 56, "maxConcurrentShardRequests": 100, + "timeField": "@timestamp", + "interval": "Daily", }), } - c, err := newV56Client(newFakeBaseClient(&ds, []string{"test-*"})) + c, err := NewClient(context.Background(), &ds, timeRange) So(err, ShouldBeNil) So(c, ShouldNotBeNil) - Convey("When creating multisearch requests should have correct headers", func() { - multiRequests := c.createMultiSearchRequests([]*SearchRequest{ - {Index: "test-*"}, + + Convey("When executing multi search", func() { + ms, err := createMultisearchForTest(c) + So(err, ShouldBeNil) + c.ExecuteMultisearch(ms) + + Convey("Should send correct request and payload", func() { + So(req, ShouldNotBeNil) + So(req.Method, ShouldEqual, http.MethodPost) + So(req.URL.Path, ShouldEqual, "/_msearch") + + So(responseBuffer, ShouldNotBeNil) + + headerBytes, err := responseBuffer.ReadBytes('\n') + So(err, ShouldBeNil) + bodyBytes := responseBuffer.Bytes() + + jHeader, err := simplejson.NewJson(headerBytes) + So(err, ShouldBeNil) + + jBody, err := simplejson.NewJson(bodyBytes) + So(err, ShouldBeNil) + + fmt.Println("body", string(headerBytes)) + + So(jHeader.Get("index").MustString(), ShouldEqual, "metrics-2018.05.15") + So(jHeader.Get("ignore_unavailable").MustBool(false), ShouldEqual, true) + So(jHeader.Get("search_type").MustString(), ShouldEqual, "query_then_fetch") + So(jHeader.Get("max_concurrent_shard_requests").MustInt(), ShouldEqual, 100) + + Convey("and replace $__interval variable", func() { + So(jBody.GetPath("aggs", "2", "aggs", "1", "avg", "script").MustString(), ShouldEqual, "15000*@hostname") + }) + + Convey("and replace $__interval_ms variable", func() { + So(jBody.GetPath("aggs", "2", "date_histogram", "interval").MustString(), ShouldEqual, "15s") + }) }) - So(multiRequests, ShouldHaveLength, 1) - header := multiRequests[0].header - So(header, ShouldHaveLength, 4) - So(header["index"], ShouldEqual, "test-*") - So(header["ignore_unavailable"], ShouldEqual, true) - So(header["search_type"], ShouldEqual, "query_then_fetch") - So(header["max_concurrent_shard_requests"], ShouldEqual, 100) }) }) + + Reset(func() { + newDatasourceHttpClient = currentNewDatasourceHttpClient + }) }) }) } -type fakeBaseClient struct { - *baseClientImpl - ds *models.DataSource -} +func createMultisearchForTest(c Client) (*MultiSearchRequest, error) { + msb := c.MultiSearch() + s := msb.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"}) + s.Agg().DateHistogram("2", "@timestamp", func(a *DateHistogramAgg, ab AggBuilder) { + a.Interval = "$__interval" -func newFakeBaseClient(ds *models.DataSource, indices []string) baseClient { - return &fakeBaseClient{ - baseClientImpl: &baseClientImpl{ - ds: ds, - indices: indices, - }, - ds: ds, - } -} - -func (c *fakeBaseClient) executeBatchRequest(uriPath string, requests []*multiRequest) (*http.Response, error) { - return nil, nil -} - -func (c *fakeBaseClient) executeRequest(method, uriPath string, body []byte) (*http.Response, error) { - return nil, nil -} - -func (c *fakeBaseClient) executeMultisearch(searchRequests []*SearchRequest) ([]*SearchResponse, error) { - return nil, nil + ab.Metric("1", "avg", "@hostname", func(a *MetricAggregation) { + a.Settings["script"] = "$__interval_ms*@hostname" + }) + }) + return msb.Build() } diff --git a/pkg/tsdb/elasticsearch/client/models.go b/pkg/tsdb/elasticsearch/client/models.go index 2f4f5dcd162..a5810a9b109 100644 --- a/pkg/tsdb/elasticsearch/client/models.go +++ b/pkg/tsdb/elasticsearch/client/models.go @@ -2,11 +2,14 @@ package es import ( "encoding/json" + + "github.com/grafana/grafana/pkg/tsdb" ) // SearchRequest represents a search request type SearchRequest struct { Index string + Interval tsdb.Interval Size int Sort map[string]interface{} Query *Query diff --git a/pkg/tsdb/elasticsearch/client/search_request.go b/pkg/tsdb/elasticsearch/client/search_request.go index a582d8ec247..2b833ce78d3 100644 --- a/pkg/tsdb/elasticsearch/client/search_request.go +++ b/pkg/tsdb/elasticsearch/client/search_request.go @@ -2,11 +2,14 @@ package es import ( "strings" + + "github.com/grafana/grafana/pkg/tsdb" ) // SearchRequestBuilder represents a builder which can build a search request type SearchRequestBuilder struct { version int + interval tsdb.Interval index string size int sort map[string]interface{} @@ -16,9 +19,10 @@ type SearchRequestBuilder struct { } // NewSearchRequestBuilder create a new search request builder -func NewSearchRequestBuilder(version int) *SearchRequestBuilder { +func NewSearchRequestBuilder(version int, interval tsdb.Interval) *SearchRequestBuilder { builder := &SearchRequestBuilder{ version: version, + interval: interval, sort: make(map[string]interface{}), customProps: make(map[string]interface{}), aggBuilders: make([]AggBuilder, 0), @@ -30,6 +34,7 @@ func NewSearchRequestBuilder(version int) *SearchRequestBuilder { func (b *SearchRequestBuilder) Build() (*SearchRequest, error) { sr := SearchRequest{ Index: b.index, + Interval: b.interval, Size: b.size, Sort: b.sort, CustomProps: b.customProps, @@ -128,8 +133,8 @@ func NewMultiSearchRequestBuilder(version int) *MultiSearchRequestBuilder { } // Search initiates and returns a new search request builder -func (m *MultiSearchRequestBuilder) Search() *SearchRequestBuilder { - b := NewSearchRequestBuilder(m.version) +func (m *MultiSearchRequestBuilder) Search(interval tsdb.Interval) *SearchRequestBuilder { + b := NewSearchRequestBuilder(m.version, interval) m.requestBuilders = append(m.requestBuilders, b) return b } diff --git a/pkg/tsdb/elasticsearch/client/search_request_test.go b/pkg/tsdb/elasticsearch/client/search_request_test.go index d93f8826442..b026578d64f 100644 --- a/pkg/tsdb/elasticsearch/client/search_request_test.go +++ b/pkg/tsdb/elasticsearch/client/search_request_test.go @@ -3,8 +3,10 @@ package es import ( "encoding/json" "testing" + "time" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/tsdb" . "github.com/smartystreets/goconvey/convey" ) @@ -13,7 +15,7 @@ func TestSearchRequest(t *testing.T) { Convey("Test elasticsearch search request", t, func() { timeField := "@timestamp" Convey("Given new search request builder for es version 5", func() { - b := NewSearchRequestBuilder(5) + b := NewSearchRequestBuilder(5, tsdb.Interval{Value: 15 * time.Second, Text: "15s"}) Convey("When building search request", func() { sr, err := b.Build() @@ -388,7 +390,7 @@ func TestSearchRequest(t *testing.T) { }) Convey("Given new search request builder for es version 2", func() { - b := NewSearchRequestBuilder(2) + b := NewSearchRequestBuilder(2, tsdb.Interval{Value: 15 * time.Second, Text: "15s"}) Convey("When adding doc value field", func() { b.AddDocValueField(timeField) @@ -447,7 +449,7 @@ func TestMultiSearchRequest(t *testing.T) { b := NewMultiSearchRequestBuilder(0) Convey("When adding one search request", func() { - b.Search() + b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"}) Convey("When building search request should contain one search request", func() { mr, err := b.Build() @@ -457,8 +459,8 @@ func TestMultiSearchRequest(t *testing.T) { }) Convey("When adding two search requests", func() { - b.Search() - b.Search() + b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"}) + b.Search(tsdb.Interval{Value: 15 * time.Second, Text: "15s"}) Convey("When building search request should contain two search requests", func() { mr, err := b.Build() diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index ef59c62c1dc..c9bb05dd09a 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -3,8 +3,6 @@ package elasticsearch import ( "fmt" "strconv" - "strings" - "time" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/tsdb" @@ -47,7 +45,7 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { } interval := e.intervalCalculator.Calculate(e.tsdbQuery.TimeRange, minInterval) - b := ms.Search() + b := ms.Search(interval) b.Size(0) filters := b.Query().Bool().Filter() filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS) @@ -78,7 +76,7 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { for _, bucketAgg := range q.BucketAggs { switch bucketAgg.Type { case "date_histogram": - aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, interval) + aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to) case "histogram": aggBuilder = addHistogramAgg(aggBuilder, bucketAgg) case "filters": @@ -125,7 +123,7 @@ func (e *timeSeriesQuery) execute() (*tsdb.Response, error) { return rp.getTimeSeries() } -func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string, interval tsdb.Interval) es.AggBuilder { +func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder { aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) { a.Interval = bucketAgg.Settings.Get("interval").MustString("auto") a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0) @@ -136,10 +134,6 @@ func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFro a.Interval = "$__interval" } - a.Interval = strings.Replace(a.Interval, "$interval", interval.Text, -1) - a.Interval = strings.Replace(a.Interval, "$__interval_ms", strconv.FormatInt(interval.Value.Nanoseconds()/int64(time.Millisecond), 10), -1) - a.Interval = strings.Replace(a.Interval, "$__interval", interval.Text, -1) - if missing, err := bucketAgg.Settings.Get("missing").String(); err == nil { a.Missing = &missing } diff --git a/pkg/tsdb/elasticsearch/time_series_query_test.go b/pkg/tsdb/elasticsearch/time_series_query_test.go index e2af4de749a..49bf5f5bc75 100644 --- a/pkg/tsdb/elasticsearch/time_series_query_test.go +++ b/pkg/tsdb/elasticsearch/time_series_query_test.go @@ -268,7 +268,7 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { So(firstLevel.Aggregation.Type, ShouldEqual, "date_histogram") hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg) So(hAgg.Field, ShouldEqual, "@timestamp") - So(hAgg.Interval, ShouldEqual, "15s") + So(hAgg.Interval, ShouldEqual, "$__interval") So(hAgg.MinDocCount, ShouldEqual, 2) })