diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index 4976ac99276..ef44bd84ca3 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -57,8 +57,14 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF queryRes := backend.DataResponse{} - if isDocumentQuery(target) { - err := processDocumentResponse(res, target, configuredFields, &queryRes) + if isRawDataQuery(target) { + err := processRawDataResponse(res, target, configuredFields, &queryRes) + if err != nil { + return &backend.QueryDataResponse{}, err + } + result.Responses[target.RefID] = queryRes + } else if isRawDocumentQuery(target) { + err := processRawDocumentResponse(res, target, &queryRes) if err != nil { return &backend.QueryDataResponse{}, err } @@ -132,7 +138,7 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields return nil } -func processDocumentResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error { +func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error { propNames := make(map[string]bool) docs := make([]map[string]interface{}, len(res.Hits.Hits)) @@ -173,6 +179,62 @@ func processDocumentResponse(res *es.SearchResponse, target *Query, configuredFi return nil } +func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes *backend.DataResponse) error { + docs := make([]map[string]interface{}, len(res.Hits.Hits)) + for hitIdx, hit := range res.Hits.Hits { + doc := map[string]interface{}{ + "_id": hit["_id"], + "_type": hit["_type"], + "_index": hit["_index"], + "sort": hit["sort"], + "highlight": hit["highlight"], + } + + if hit["_source"] != nil { + source, ok := hit["_source"].(map[string]interface{}) + if ok { + for k, v := range source { + doc[k] = v + } + } + } + + if hit["fields"] != nil { + source, ok := hit["fields"].(map[string]interface{}) + if ok { + for k, v := range source { + doc[k] = v + } + } + } + + docs[hitIdx] = doc + } + + fieldVector := make([]*json.RawMessage, len(res.Hits.Hits)) + for i, doc := range docs { + bytes, err := json.Marshal(doc) + if err != nil { + // We skip docs that can't be marshalled + // should not happen + continue + } + value := json.RawMessage(bytes) + fieldVector[i] = &value + } + + isFilterable := true + field := data.NewField(target.RefID, nil, fieldVector) + field.Config = &data.FieldConfig{Filterable: &isFilterable} + + frames := data.Frames{} + frame := data.NewFrame(target.RefID, field) + frames = append(frames, frame) + + queryRes.Frames = frames + return nil +} + func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []string, configuredFields es.ConfiguredFields) []*data.Field { size := len(docs) isFilterable := true diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index e9237451750..d2d58315f85 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -1164,6 +1164,92 @@ func TestResponseParser(t *testing.T) { require.Equal(t, data.FieldTypeNullableString, frame.Fields[15].Type()) }) + t.Run("Raw document query", func(t *testing.T) { + targets := map[string]string{ + "A": `{ + "metrics": [{ "type": "raw_document" }] + }`, + } + + response := `{ + "responses":[ + { + "hits":{ + "total":{ + "value":109, + "relation":"eq" + }, + "max_score":null, + "hits":[ + { + "_index":"logs-2023.02.08", + "_id":"GB2UMYYBfCQ-FCMjayJa", + "_score":null, + "fields": { + "test_field":"A" + }, + "_source":{ + "@timestamp":"2023-02-08T15:10:55.830Z", + "line":"log text [479231733]", + "counter":"109", + "float":58.253758485091, + "label":"val1", + "level":"info", + "location":"17.089705232090438, 41.62861966340297", + "nested": { + "field": { + "double_nested": "value" + } + } + } + }, + { + "_index":"logs-2023.02.08", + "_id":"Fx2UMYYBfCQ-FCMjZyJ_", + "_score":null, + "fields": { + "test_field":"A" + }, + "_source":{ + "@timestamp":"2023-02-08T15:10:54.835Z", + "line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]", + "counter":"108", + "float":54.5977098233944, + "label":"val1", + "level":"info", + "location":"19.766305918490463, 40.42639175509792", + "nested": { + "field": { + "double_nested": "value1" + } + } + } + } + ] + }, + "status":200 + } + ] + }` + + result, err := parseTestResponse(targets, response) + require.NoError(t, err) + require.Len(t, result.Responses, 1) + + queryRes := result.Responses["A"] + require.NotNil(t, queryRes) + dataframes := queryRes.Frames + require.Len(t, dataframes, 1) + frame := dataframes[0] + + require.Equal(t, 1, len(frame.Fields)) + //Fields have the correct length + require.Equal(t, 2, frame.Fields[0].Len()) + // The only field is the raw document + require.Equal(t, data.FieldTypeNullableJSON, frame.Fields[0].Type()) + require.Equal(t, "A", frame.Fields[0].Name) + }) + t.Run("Raw data query", func(t *testing.T) { targets := map[string]string{ "A": `{ diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index 979ba0c3dab..54fa3992aa7 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -312,7 +312,15 @@ func isLogsQuery(query *Query) bool { } func isDocumentQuery(query *Query) bool { - return query.Metrics[0].Type == rawDataType || query.Metrics[0].Type == rawDocumentType + return isRawDataQuery(query) || isRawDocumentQuery(query) +} + +func isRawDataQuery(query *Query) bool { + return query.Metrics[0].Type == rawDataType +} + +func isRawDocumentQuery(query *Query) bool { + return query.Metrics[0].Type == rawDocumentType } func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {