Elasticsearch: Fix handling of inline scripts in different ES versions (#34070)

* Devenv: add block for es 5.0, provisioned datasource & dashboard

* Trasnsform script property based on running ES version

* Handle different scripts format in BE
This commit is contained in:
Giordano Ricci
2021-05-14 11:50:15 +01:00
committed by GitHub
parent ddb2fc1ae6
commit 8ec87250c1
11 changed files with 1339 additions and 334 deletions

View File

@@ -73,6 +73,16 @@ var pipelineAggType = map[string]string{
"bucket_script": "bucket_script",
}
var scriptableAggType = map[string]string{
"avg": "avg",
"sum": "sum",
"max": "max",
"min": "min",
"extended_stats": "extended_stats",
"percentiles": "percentiles",
"bucket_script": "bucket_script",
}
var pipelineAggWithMultipleBucketPathsType = map[string]string{
"bucket_script": "bucket_script",
}
@@ -84,6 +94,13 @@ func isPipelineAgg(metricType string) bool {
return false
}
func isMetricAggregationWithInlineScriptSupport(metricType string) bool {
if _, ok := scriptableAggType[metricType]; ok {
return true
}
return false
}
func isPipelineAggWithMultipleBucketPaths(metricType string) bool {
if _, ok := pipelineAggWithMultipleBucketPathsType[metricType]; ok {
return true

View File

@@ -5,6 +5,7 @@ import (
"regexp"
"strconv"
"github.com/Masterminds/semver"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
@@ -143,7 +144,7 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPaths, func(a *es.PipelineAggregation) {
a.Settings = m.Settings.MustMap()
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
})
} else {
continue
@@ -164,7 +165,7 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
}
aggBuilder.Pipeline(m.ID, m.Type, bucketPath, func(a *es.PipelineAggregation) {
a.Settings = m.generateSettingsForDSL()
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
})
}
} else {
@@ -173,7 +174,7 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
}
} else {
aggBuilder.Metric(m.ID, m.Type, m.Field, func(a *es.MetricAggregation) {
a.Settings = m.Settings.MustMap()
a.Settings = m.generateSettingsForDSL(e.client.GetVersion())
})
}
}
@@ -182,7 +183,7 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
}
// Casts values to int when required by Elastic's query DSL
func (metricAggregation MetricAgg) generateSettingsForDSL() map[string]interface{} {
func (metricAggregation MetricAgg) generateSettingsForDSL(version *semver.Version) map[string]interface{} {
setFloatPath := func(path ...string) {
if stringValue, err := metricAggregation.Settings.GetPath(path...).String(); err == nil {
if value, err := strconv.ParseFloat(stringValue, 64); err == nil {
@@ -203,6 +204,24 @@ func (metricAggregation MetricAgg) generateSettingsForDSL() map[string]interface
setFloatPath("lag")
}
if isMetricAggregationWithInlineScriptSupport(metricAggregation.Type) {
scriptValue, err := metricAggregation.Settings.GetPath("script").String()
if err != nil {
// the script is stored using the old format : `script:{inline: "value"}` or is not set
scriptValue, err = metricAggregation.Settings.GetPath("script", "inline").String()
}
constraint, _ := semver.NewConstraint(">=5.6.0")
if err == nil {
if constraint.Check(version) {
metricAggregation.Settings.SetPath([]string{"script"}, scriptValue)
} else {
metricAggregation.Settings.SetPath([]string{"script"}, map[string]interface{}{"inline": scriptValue})
}
}
}
return metricAggregation.Settings.MustMap()
}

View File

@@ -933,6 +933,93 @@ func TestSettingsCasting(t *testing.T) {
assert.Equal(t, 1., serialDiffSettings["lag"])
})
t.Run("Inline Script", func(t *testing.T) {
t.Run("Correctly handles scripts for ES < 5.6", func(t *testing.T) {
c := newFakeClient("5.0.0")
for key := range scriptableAggType {
t.Run("Inline Script", func(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
],
"metrics": [
{
"id": "1",
"type": "`+key+`",
"settings": {
"script": "my_script"
}
},
{
"id": "3",
"type": "`+key+`",
"settings": {
"script": {
"inline": "my_script"
}
}
}
]
}`, from, to, 15*time.Second)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
newFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[0].Aggregation.Aggregation.(*es.MetricAggregation).Settings
oldFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.MetricAggregation).Settings
assert.Equal(t, map[string]interface{}{"inline": "my_script"}, newFormatAggSettings["script"])
assert.Equal(t, map[string]interface{}{"inline": "my_script"}, oldFormatAggSettings["script"])
})
}
})
t.Run("Correctly handles scripts for ES >= 5.6", func(t *testing.T) {
c := newFakeClient("5.6.0")
for key := range scriptableAggType {
fmt.Println(key)
t.Run("Inline Script", func(t *testing.T) {
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"bucketAggs": [
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
],
"metrics": [
{
"id": "1",
"type": "`+key+`",
"settings": {
"script": "my_script"
}
},
{
"id": "3",
"type": "`+key+`",
"settings": {
"script": {
"inline": "my_script"
}
}
}
]
}`, from, to, 15*time.Second)
assert.Nil(t, err)
sr := c.multisearchRequests[0].Requests[0]
newFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[0].Aggregation.Aggregation.(*es.MetricAggregation).Settings
oldFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.MetricAggregation).Settings
assert.Equal(t, "my_script", newFormatAggSettings["script"])
assert.Equal(t, "my_script", oldFormatAggSettings["script"])
})
}
})
})
}
type fakeClient struct {