Elasticsearch: Add processing for raw data to backend (#63208)

* WIP

* WIP

* Refactor

* Add tests

* Cleanup

* Fix whitespace

* Fix test and lint

* In snapshot tests update counter to be number

* Add boolean value for snapshot testing

* Update pkg/tsdb/elasticsearch/response_parser.go

Co-authored-by: Gábor Farkas <gabor.farkas@gmail.com>

* Update pkg/tsdb/elasticsearch/response_parser.go

Co-authored-by: Gábor Farkas <gabor.farkas@gmail.com>

* Use generic to reuse logic when creating fields

* Use nullable fields

* Fix lint

* WIP (#63272)

wip

* Fix snapshot test after we changed field types to nullable

---------

Co-authored-by: Gábor Farkas <gabor.farkas@gmail.com>
This commit is contained in:
Ivana Huckova
2023-02-22 13:28:43 +01:00
committed by GitHub
parent 0a73ac36ad
commit 89b3663a23
7 changed files with 1120 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
package elasticsearch
import (
"encoding/json"
"errors"
"regexp"
"sort"
@@ -35,7 +36,7 @@ const (
logsType = "logs"
)
func parseResponse(responses []*es.SearchResponse, targets []*Query) (*backend.QueryDataResponse, error) {
func parseResponse(responses []*es.SearchResponse, targets []*Query, timeField string) (*backend.QueryDataResponse, error) {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
@@ -56,19 +57,123 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query) (*backend.Q
queryRes := backend.DataResponse{}
props := make(map[string]string)
err := processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return &backend.QueryDataResponse{}, err
}
nameFields(queryRes, target)
trimDatapoints(queryRes, target)
if isDocumentQuery(target) {
err := processDocumentResponse(res, target, timeField, &queryRes)
if err != nil {
return &backend.QueryDataResponse{}, err
}
result.Responses[target.RefID] = queryRes
} else {
props := make(map[string]string)
err := processBuckets(res.Aggregations, target, &queryRes, props, 0)
if err != nil {
return &backend.QueryDataResponse{}, err
}
nameFields(queryRes, target)
trimDatapoints(queryRes, target)
result.Responses[target.RefID] = queryRes
result.Responses[target.RefID] = queryRes
}
}
return &result, nil
}
func processDocumentResponse(res *es.SearchResponse, target *Query, timeField string, queryRes *backend.DataResponse) error {
docs := make([]map[string]interface{}, len(res.Hits.Hits))
propNames := make(map[string]bool)
for hitIdx, hit := range res.Hits.Hits {
var flattened map[string]interface{}
if hit["_source"] != nil {
flattened = flatten(hit["_source"].(map[string]interface{}))
}
doc := map[string]interface{}{
"_id": hit["_id"],
"_type": hit["_type"],
"_index": hit["_index"],
"sort": hit["sort"],
"highlight": hit["highlight"],
"_source": flattened,
}
for k, v := range flattened {
doc[k] = v
}
for key := range doc {
propNames[key] = true
}
docs[hitIdx] = doc
}
size := len(docs)
isFilterable := true
allFields := make([]*data.Field, len(propNames))
sortedPropNames := sortPropNames(propNames, timeField)
for propNameIdx, propName := range sortedPropNames {
// Special handling for time field
if propName == timeField {
timeVector := make([]*time.Time, size)
for i, doc := range docs {
timeString, ok := doc[timeField].(string)
if !ok {
continue
}
timeValue, err := time.Parse(time.RFC3339Nano, timeString)
if err != nil {
// We skip time values that cannot be parsed
continue
} else {
timeVector[i] = &timeValue
}
}
field := data.NewField(timeField, nil, timeVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
allFields[propNameIdx] = field
continue
}
propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName)
switch propNameValue.(type) {
// We are checking for default data types values (float64, int, bool, string)
// and default to json.RawMessage if we cannot find any of them
case float64:
allFields[propNameIdx] = createFieldOfType[float64](docs, propName, size, isFilterable)
case int:
allFields[propNameIdx] = createFieldOfType[int](docs, propName, size, isFilterable)
case string:
allFields[propNameIdx] = createFieldOfType[string](docs, propName, size, isFilterable)
case bool:
allFields[propNameIdx] = createFieldOfType[bool](docs, propName, size, isFilterable)
default:
fieldVector := make([]*json.RawMessage, size)
for i, doc := range docs {
bytes, err := json.Marshal(doc[propName])
if err != nil {
// We skip values that cannot be marshalled
continue
}
value := json.RawMessage(bytes)
fieldVector[i] = &value
}
field := data.NewField(propName, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
allFields[propNameIdx] = field
}
}
frames := data.Frames{}
frame := data.NewFrame("", allFields...)
frames = append(frames, frame)
queryRes.Frames = frames
return nil
}
func processBuckets(aggs map[string]interface{}, target *Query,
queryResult *backend.DataResponse, props map[string]string, depth int) error {
var err error
@@ -753,3 +858,84 @@ func getErrorFromElasticResponse(response *es.SearchResponse) string {
return errorString
}
// flatten flattens multi-level objects to single level objects. It uses dot notation to join keys.
func flatten(target map[string]interface{}) map[string]interface{} {
// On frontend maxDepth wasn't used but as we are processing on backend
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
maxDepth := 10
currentDepth := 0
delimiter := ""
output := make(map[string]interface{})
var step func(object map[string]interface{}, prev string)
step = func(object map[string]interface{}, prev string) {
for key, value := range object {
if prev == "" {
delimiter = ""
} else {
delimiter = "."
}
newKey := prev + delimiter + key
v, ok := value.(map[string]interface{})
shouldStepInside := ok && len(v) > 0 && currentDepth < maxDepth
if shouldStepInside {
currentDepth++
step(v, newKey)
} else {
output[newKey] = value
}
}
}
step(target, "")
return output
}
// sortPropNames orders propNames so that timeField is first (if it exists) and rest of propNames are ordered alphabetically
func sortPropNames(propNames map[string]bool, timeField string) []string {
hasTimeField := false
var sortedPropNames []string
for k := range propNames {
if k == timeField {
hasTimeField = true
} else {
sortedPropNames = append(sortedPropNames, k)
}
}
sort.Strings(sortedPropNames)
if hasTimeField {
sortedPropNames = append([]string{timeField}, sortedPropNames...)
}
return sortedPropNames
}
// findTheFirstNonNilDocValueForPropName finds the first non-nil value for propName in docs. If none of the values are non-nil, it returns the value of propName in the first doc.
func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} {
for _, doc := range docs {
if doc[propName] != nil {
return doc[propName]
}
}
return docs[0][propName]
}
func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string, size int, isFilterable bool) *data.Field {
fieldVector := make([]*T, size)
for i, doc := range docs {
value, ok := doc[propName].(T)
if !ok {
continue
}
fieldVector[i] = &value
}
field := data.NewField(propName, nil, fieldVector)
field.Config = &data.FieldConfig{Filterable: &isFilterable}
return field
}