diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index 065de02c40e..c8d1f946d61 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -102,7 +102,7 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields for hitIdx, hit := range res.Hits.Hits { var flattened map[string]interface{} if hit["_source"] != nil { - flattened = flatten(hit["_source"].(map[string]interface{})) + flattened = flatten(hit["_source"].(map[string]interface{}), 10) } doc := map[string]interface{}{ @@ -174,7 +174,7 @@ func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFie for hitIdx, hit := range res.Hits.Hits { var flattened map[string]interface{} if hit["_source"] != nil { - flattened = flatten(hit["_source"].(map[string]interface{})) + flattened = flatten(hit["_source"].(map[string]interface{}), 10) } doc := map[string]interface{}{ @@ -1041,38 +1041,26 @@ func getErrorFromElasticResponse(response *es.SearchResponse) string { } // flatten flattens multi-level objects to single level objects. It uses dot notation to join keys. -func flatten(target map[string]interface{}) map[string]interface{} { +func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} { // On frontend maxDepth wasn't used but as we are processing on backend // let's put a limit to avoid infinite loop. 10 was chosen arbitrary. - maxDepth := 10 - currentDepth := 0 - delimiter := "" output := make(map[string]interface{}) + step(0, maxDepth, target, "", output) + return output +} - var step func(object map[string]interface{}, prev string) +func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) { + nextDepth := currentDepth + 1 + for key, value := range target { + newKey := strings.Trim(prev+"."+key, ".") - step = func(object map[string]interface{}, prev string) { - for key, value := range object { - if prev == "" { - delimiter = "" - } else { - delimiter = "." - } - newKey := prev + delimiter + key - - v, ok := value.(map[string]interface{}) - shouldStepInside := ok && len(v) > 0 && currentDepth < maxDepth - if shouldStepInside { - currentDepth++ - step(v, newKey) - } else { - output[newKey] = value - } + v, ok := value.(map[string]interface{}) + if ok && len(v) > 0 && currentDepth < maxDepth { + step(nextDepth, maxDepth, v, newKey, output) + } else { + output[newKey] = value } } - - step(target, "") - return output } // sortPropNames orders propNames so that timeField is first (if it exists), log message field is second diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index 25b09ca1d76..a54e955eee3 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -2481,7 +2481,6 @@ func TestProcessBuckets(t *testing.T) { } ] } - `) result, err := queryDataTest(query, response) @@ -3305,7 +3304,7 @@ func TestFlatten(t *testing.T) { }, } - flattened := flatten(obj) + flattened := flatten(obj, 10) require.Len(t, flattened, 2) require.Equal(t, "bar", flattened["foo"]) require.Equal(t, "qux", flattened["nested.bax.baz"]) @@ -3340,10 +3339,111 @@ func TestFlatten(t *testing.T) { }, } - flattened := flatten(obj) + flattened := flatten(obj, 10) require.Len(t, flattened, 1) require.Equal(t, map[string]interface{}{"nested11": map[string]interface{}{"nested12": "abc"}}, flattened["nested0.nested1.nested2.nested3.nested4.nested5.nested6.nested7.nested8.nested9.nested10"]) }) + + t.Run("does not affect any non-nested JSON", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": "", + } + + assert.Equal(t, map[string]interface{}{ + "fieldName": "", + }, flatten(target, 10)) + }) + + t.Run("flattens up to maxDepth", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName2": map[string]interface{}{ + "innerFieldName2": map[string]interface{}{ + "innerFieldName3": "", + }, + }, + } + + assert.Equal(t, map[string]interface{}{ + "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1)) + }) + + t.Run("flattens up to maxDepth with multiple keys in target", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": map[string]interface{}{ + "innerFieldName3": "", + }, + }, + } + + assert.Equal(t, map[string]interface{}{"fieldName.innerFieldName": "", "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1)) + }) + + t.Run("flattens multiple objects of the same max depth", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": "", + }, + } + + assert.Equal(t, map[string]interface{}{ + "fieldName.innerFieldName": "", + "fieldName2.innerFieldName2": ""}, flatten(target, 1)) + }) + + t.Run("only flattens multiple entries in the same key", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + "innerFieldName1": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": map[string]interface{}{ + "innerFieldName3": "", + }, + }, + } + + assert.Equal(t, map[string]interface{}{ + "fieldName.innerFieldName": "", + "fieldName.innerFieldName1": "", + "fieldName2.innerFieldName2": map[string]interface{}{"innerFieldName3": ""}}, flatten(target, 1)) + }) + + t.Run("combines nested field names", func(t *testing.T) { + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "", + }, + "fieldName2": map[string]interface{}{ + "innerFieldName2": "", + }, + } + + assert.Equal(t, map[string]interface{}{"fieldName.innerFieldName": "", "fieldName2.innerFieldName2": ""}, flatten(target, 10)) + }) + + t.Run("will preserve only one key with the same name", func(t *testing.T) { + // This test documents that in the unlikely case of a collision of a flattened name and an existing key, only + // one entry's value will be preserved at random + target := map[string]interface{}{ + "fieldName": map[string]interface{}{ + "innerFieldName": "one of these values will be lost", + }, + "fieldName.innerFieldName": "this may be lost", + } + + result := flatten(target, 10) + assert.Len(t, result, 1) + _, ok := result["fieldName.innerFieldName"] + assert.True(t, ok) + }) } func TestTrimEdges(t *testing.T) {