Elasticsearch: Fix multiple max depth flatten of multi-level objects (#70302)

This commit is contained in:
Shirley
2023-07-14 11:48:00 +02:00
committed by GitHub
parent 1cacc78eda
commit c1f6b91ea9
2 changed files with 118 additions and 30 deletions

View File

@@ -102,7 +102,7 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields
for hitIdx, hit := range res.Hits.Hits {
var flattened map[string]interface{}
if hit["_source"] != nil {
flattened = flatten(hit["_source"].(map[string]interface{}))
flattened = flatten(hit["_source"].(map[string]interface{}), 10)
}
doc := map[string]interface{}{
@@ -174,7 +174,7 @@ func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFie
for hitIdx, hit := range res.Hits.Hits {
var flattened map[string]interface{}
if hit["_source"] != nil {
flattened = flatten(hit["_source"].(map[string]interface{}))
flattened = flatten(hit["_source"].(map[string]interface{}), 10)
}
doc := map[string]interface{}{
@@ -1041,38 +1041,26 @@ func getErrorFromElasticResponse(response *es.SearchResponse) string {
}
// flatten flattens multi-level objects to single level objects. It uses dot notation to join keys.
func flatten(target map[string]interface{}) map[string]interface{} {
func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
maxDepth := 10
currentDepth := 0
delimiter := ""
output := make(map[string]interface{})
step(0, maxDepth, target, "", output)
return output
}
var step func(object map[string]interface{}, prev string)
func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) {
nextDepth := currentDepth + 1
for key, value := range target {
newKey := strings.Trim(prev+"."+key, ".")
step = func(object map[string]interface{}, prev string) {
for key, value := range object {
if prev == "" {
delimiter = ""
} else {
delimiter = "."
}
newKey := prev + delimiter + key
v, ok := value.(map[string]interface{})
shouldStepInside := ok && len(v) > 0 && currentDepth < maxDepth
if shouldStepInside {
currentDepth++
step(v, newKey)
} else {
output[newKey] = value
}
v, ok := value.(map[string]interface{})
if ok && len(v) > 0 && currentDepth < maxDepth {
step(nextDepth, maxDepth, v, newKey, output)
} else {
output[newKey] = value
}
}
step(target, "")
return output
}
// sortPropNames orders propNames so that timeField is first (if it exists), log message field is second

View File

@@ -2481,7 +2481,6 @@ func TestProcessBuckets(t *testing.T) {
}
]
}
`)
result, err := queryDataTest(query, response)
@@ -3305,7 +3304,7 @@ func TestFlatten(t *testing.T) {
},
}
flattened := flatten(obj)
flattened := flatten(obj, 10)
require.Len(t, flattened, 2)
require.Equal(t, "bar", flattened["foo"])
require.Equal(t, "qux", flattened["nested.bax.baz"])
@@ -3340,10 +3339,111 @@ func TestFlatten(t *testing.T) {
},
}
flattened := flatten(obj)
flattened := flatten(obj, 10)
require.Len(t, flattened, 1)
require.Equal(t, map[string]interface{}{"nested11": map[string]interface{}{"nested12": "abc"}}, flattened["nested0.nested1.nested2.nested3.nested4.nested5.nested6.nested7.nested8.nested9.nested10"])
})
t.Run("does not affect any non-nested JSON", func(t *testing.T) {
target := map[string]interface{}{
"fieldName": "",
}
assert.Equal(t, map[string]interface{}{
"fieldName": "",
}, flatten(target, 10))
})
t.Run("flattens up to maxDepth", func(t *testing.T) {
target := map[string]interface{}{
"fieldName2": map[string]interface{}{
"innerFieldName2": map[string]interface{}{
"innerFieldName3": "",
},
},
}
assert.Equal(t, map[string]interface{}{
"fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1))
})
t.Run("flattens up to maxDepth with multiple keys in target", func(t *testing.T) {
target := map[string]interface{}{
"fieldName": map[string]interface{}{
"innerFieldName": "",
},
"fieldName2": map[string]interface{}{
"innerFieldName2": map[string]interface{}{
"innerFieldName3": "",
},
},
}
assert.Equal(t, map[string]interface{}{"fieldName.innerFieldName": "", "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1))
})
t.Run("flattens multiple objects of the same max depth", func(t *testing.T) {
target := map[string]interface{}{
"fieldName": map[string]interface{}{
"innerFieldName": "",
},
"fieldName2": map[string]interface{}{
"innerFieldName2": "",
},
}
assert.Equal(t, map[string]interface{}{
"fieldName.innerFieldName": "",
"fieldName2.innerFieldName2": ""}, flatten(target, 1))
})
t.Run("only flattens multiple entries in the same key", func(t *testing.T) {
target := map[string]interface{}{
"fieldName": map[string]interface{}{
"innerFieldName": "",
"innerFieldName1": "",
},
"fieldName2": map[string]interface{}{
"innerFieldName2": map[string]interface{}{
"innerFieldName3": "",
},
},
}
assert.Equal(t, map[string]interface{}{
"fieldName.innerFieldName": "",
"fieldName.innerFieldName1": "",
"fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1))
})
t.Run("combines nested field names", func(t *testing.T) {
target := map[string]interface{}{
"fieldName": map[string]interface{}{
"innerFieldName": "",
},
"fieldName2": map[string]interface{}{
"innerFieldName2": "",
},
}
assert.Equal(t, map[string]interface{}{"fieldName.innerFieldName": "", "fieldName2.innerFieldName2": ""}, flatten(target, 10))
})
t.Run("will preserve only one key with the same name", func(t *testing.T) {
// This test documents that in the unlikely case of a collision of a flattened name and an existing key, only
// one entry's value will be preserved at random
target := map[string]interface{}{
"fieldName": map[string]interface{}{
"innerFieldName": "one of these values will be lost",
},
"fieldName.innerFieldName": "this may be lost",
}
result := flatten(target, 10)
assert.Len(t, result, 1)
_, ok := result["fieldName.innerFieldName"]
assert.True(t, ok)
})
}
func TestTrimEdges(t *testing.T) {