From 7a304223e69f6a9c06405ed7ec6ff817ebb4787d Mon Sep 17 00:00:00 2001 From: Giordano Ricci Date: Fri, 4 Jun 2021 15:42:00 +0100 Subject: [PATCH] Elasticsearch: Fix min_doc_count value when alerting (#35254) * Elasticsearch: Fix min_doc_count value when alerting * Add tests --- pkg/tsdb/elasticsearch/time_series_query.go | 49 +++++++++---- .../elasticsearch/time_series_query_test.go | 72 +++++++++++++++++++ 2 files changed, 107 insertions(+), 14 deletions(-) diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index 57c18febd14..08e893e1ff7 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -100,6 +100,9 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde // iterate backwards to create aggregations bottom-down for _, bucketAgg := range q.BucketAggs { + bucketAgg.Settings = simplejson.NewFromAny( + bucketAgg.generateSettingsForDSL(), + ) switch bucketAgg.Type { case dateHistType: aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to) @@ -182,26 +185,34 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde return nil } -// Casts values to int when required by Elastic's query DSL -func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} { - setFloatPath := func(path ...string) { - if stringValue, err := metricAggregation.Settings.GetPath(path...).String(); err == nil { - if value, err := strconv.ParseFloat(stringValue, 64); err == nil { - metricAggregation.Settings.SetPath(path, value) - } +func setFloatPath(settings *simplejson.Json, path ...string) { + if stringValue, err := settings.GetPath(path...).String(); err == nil { + if value, err := strconv.ParseFloat(stringValue, 64); err == nil { + settings.SetPath(path, value) } } +} +func setIntPath(settings *simplejson.Json, path ...string) { + if stringValue, err := settings.GetPath(path...).String(); err == nil { + if value, err := strconv.ParseInt(stringValue, 10, 64); err == nil { + settings.SetPath(path, value) + } + } +} + +// Casts values to float when required by Elastic's query DSL +func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} { switch metricAggregation.Type { case "moving_avg": - setFloatPath("window") - setFloatPath("predict") - setFloatPath("settings", "alpha") - setFloatPath("settings", "beta") - setFloatPath("settings", "gamma") - setFloatPath("settings", "period") + setFloatPath(metricAggregation.Settings, "window") + setFloatPath(metricAggregation.Settings, "predict") + setFloatPath(metricAggregation.Settings, "settings", "alpha") + setFloatPath(metricAggregation.Settings, "settings", "beta") + setFloatPath(metricAggregation.Settings, "settings", "gamma") + setFloatPath(metricAggregation.Settings, "settings", "period") case "serial_diff": - setFloatPath("lag") + setFloatPath(metricAggregation.Settings, "lag") } if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) { @@ -225,6 +236,16 @@ func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Versio return metricAggregation.Settings.MustMap() } +func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} { + // TODO: This might also need to be applied to other bucket aggregations and other fields. + switch bucketAgg.Type { + case "date_histogram": + setIntPath(bucketAgg.Settings, "min_doc_count") + } + + return bucketAgg.Settings.MustMap() +} + func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder { aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) { a.Interval = bucketAgg.Settings.Get("interval").MustString("auto") diff --git a/pkg/tsdb/elasticsearch/time_series_query_test.go b/pkg/tsdb/elasticsearch/time_series_query_test.go index 5a3d8a02b5e..b33c23fe286 100644 --- a/pkg/tsdb/elasticsearch/time_series_query_test.go +++ b/pkg/tsdb/elasticsearch/time_series_query_test.go @@ -934,6 +934,78 @@ func TestSettingsCasting(t *testing.T) { assert.Equal(t, 1., serialDiffSettings["lag"]) }) + t.Run("Date Histogram Settings", func(t *testing.T) { + t.Run("Correctly transforms date_histogram settings", func(t *testing.T) { + c := newFakeClient("5.0.0") + _, err := executeTsdbQuery(c, `{ + "timeField": "@timestamp", + "bucketAggs": [ + { + "type": "date_histogram", + "field": "@timestamp", + "id": "2", + "settings": { + "min_doc_count": "1" + } + } + ], + "metrics": [ + { "id": "1", "type": "average", "field": "@value" }, + { + "id": "3", + "type": "serial_diff", + "field": "1", + "pipelineAgg": "1", + "settings": { + "lag": "1" + } + } + ] + }`, from, to, 15*time.Second) + assert.Nil(t, err) + sr := c.multisearchRequests[0].Requests[0] + + dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) + + assert.Equal(t, 1, dateHistogramAgg.MinDocCount) + }) + + t.Run("Correctly uses already int min_doc_count", func(t *testing.T) { + c := newFakeClient("5.0.0") + _, err := executeTsdbQuery(c, `{ + "timeField": "@timestamp", + "bucketAggs": [ + { + "type": "date_histogram", + "field": "@timestamp", + "id": "2", + "settings": { + "min_doc_count": 10 + } + } + ], + "metrics": [ + { "id": "1", "type": "average", "field": "@value" }, + { + "id": "3", + "type": "serial_diff", + "field": "1", + "pipelineAgg": "1", + "settings": { + "lag": "1" + } + } + ] + }`, from, to, 15*time.Second) + assert.Nil(t, err) + sr := c.multisearchRequests[0].Requests[0] + + dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg) + + assert.Equal(t, 10, dateHistogramAgg.MinDocCount) + }) + }) + t.Run("Inline Script", func(t *testing.T) { t.Run("Correctly handles scripts for ES < 5.6", func(t *testing.T) { c := newFakeClient("5.0.0")