Elasticsearch: Fix min_doc_count value when alerting (#35254)

* Elasticsearch: Fix min_doc_count value when alerting

* Add tests
This commit is contained in:
Giordano Ricci 2021-06-04 15:42:00 +01:00 committed by GitHub
parent 70155c7fd0
commit 7a304223e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 14 deletions

View File

@ -100,6 +100,9 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
// iterate backwards to create aggregations bottom-down
for _, bucketAgg := range q.BucketAggs {
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
switch bucketAgg.Type {
case dateHistType:
aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
@ -182,26 +185,34 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
return nil
}
// Casts values to int when required by Elastic's query DSL
func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} {
setFloatPath := func(path ...string) {
if stringValue, err := metricAggregation.Settings.GetPath(path...).String(); err == nil {
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
metricAggregation.Settings.SetPath(path, value)
}
func setFloatPath(settings *simplejson.Json, path ...string) {
if stringValue, err := settings.GetPath(path...).String(); err == nil {
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
settings.SetPath(path, value)
}
}
}
func setIntPath(settings *simplejson.Json, path ...string) {
if stringValue, err := settings.GetPath(path...).String(); err == nil {
if value, err := strconv.ParseInt(stringValue, 10, 64); err == nil {
settings.SetPath(path, value)
}
}
}
// Casts values to float when required by Elastic's query DSL
func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} {
switch metricAggregation.Type {
case "moving_avg":
setFloatPath("window")
setFloatPath("predict")
setFloatPath("settings", "alpha")
setFloatPath("settings", "beta")
setFloatPath("settings", "gamma")
setFloatPath("settings", "period")
setFloatPath(metricAggregation.Settings, "window")
setFloatPath(metricAggregation.Settings, "predict")
setFloatPath(metricAggregation.Settings, "settings", "alpha")
setFloatPath(metricAggregation.Settings, "settings", "beta")
setFloatPath(metricAggregation.Settings, "settings", "gamma")
setFloatPath(metricAggregation.Settings, "settings", "period")
case "serial_diff":
setFloatPath("lag")
setFloatPath(metricAggregation.Settings, "lag")
}
if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) {
@ -225,6 +236,16 @@ func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Versio
return metricAggregation.Settings.MustMap()
}
func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} {
// TODO: This might also need to be applied to other bucket aggregations and other fields.
switch bucketAgg.Type {
case "date_histogram":
setIntPath(bucketAgg.Settings, "min_doc_count")
}
return bucketAgg.Settings.MustMap()
}
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo string) es.AggBuilder {
aggBuilder.DateHistogram(bucketAgg.ID, bucketAgg.Field, func(a *es.DateHistogramAgg, b es.AggBuilder) {
a.Interval = bucketAgg.Settings.Get("interval").MustString("auto")

View File

@ -934,6 +934,78 @@ func TestSettingsCasting(t *testing.T) {
assert.Equal(t, 1., serialDiffSettings["lag"])
})
t.Run("Date Histogram Settings", func(t *testing.T) {
t.Run("Correctly transforms date_histogram settings", func(t *testing.T) {
c := newFakeClient("5.0.0")
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"type": "date_histogram",
"field": "@timestamp",
"id": "2",
"settings": {
"min_doc_count": "1"
}
}
],
"metrics": [
{ "id": "1", "type": "average", "field": "@value" },
{
"id": "3",
"type": "serial_diff",
"field": "1",
"pipelineAgg": "1",
"settings": {
"lag": "1"
}
}
]
}`, from, to, 15*time.Second)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
assert.Equal(t, 1, dateHistogramAgg.MinDocCount)
})
t.Run("Correctly uses already int min_doc_count", func(t *testing.T) {
c := newFakeClient("5.0.0")
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{
"type": "date_histogram",
"field": "@timestamp",
"id": "2",
"settings": {
"min_doc_count": 10
}
}
],
"metrics": [
{ "id": "1", "type": "average", "field": "@value" },
{
"id": "3",
"type": "serial_diff",
"field": "1",
"pipelineAgg": "1",
"settings": {
"lag": "1"
}
}
]
}`, from, to, 15*time.Second)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
assert.Equal(t, 10, dateHistogramAgg.MinDocCount)
})
})
t.Run("Inline Script", func(t *testing.T) {
t.Run("Correctly handles scripts for ES < 5.6", func(t *testing.T) {
c := newFakeClient("5.0.0")