mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Elasticsearch: Implement processing of raw document query results in backend (#63932)
This commit is contained in:
parent
dc6d0a2bdb
commit
adda7819e9
@ -57,8 +57,14 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF
|
||||
|
||||
queryRes := backend.DataResponse{}
|
||||
|
||||
if isDocumentQuery(target) {
|
||||
err := processDocumentResponse(res, target, configuredFields, &queryRes)
|
||||
if isRawDataQuery(target) {
|
||||
err := processRawDataResponse(res, target, configuredFields, &queryRes)
|
||||
if err != nil {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
result.Responses[target.RefID] = queryRes
|
||||
} else if isRawDocumentQuery(target) {
|
||||
err := processRawDocumentResponse(res, target, &queryRes)
|
||||
if err != nil {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
@ -132,7 +138,7 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields
|
||||
return nil
|
||||
}
|
||||
|
||||
func processDocumentResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error {
|
||||
func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error {
|
||||
propNames := make(map[string]bool)
|
||||
docs := make([]map[string]interface{}, len(res.Hits.Hits))
|
||||
|
||||
@ -173,6 +179,62 @@ func processDocumentResponse(res *es.SearchResponse, target *Query, configuredFi
|
||||
return nil
|
||||
}
|
||||
|
||||
func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes *backend.DataResponse) error {
|
||||
docs := make([]map[string]interface{}, len(res.Hits.Hits))
|
||||
for hitIdx, hit := range res.Hits.Hits {
|
||||
doc := map[string]interface{}{
|
||||
"_id": hit["_id"],
|
||||
"_type": hit["_type"],
|
||||
"_index": hit["_index"],
|
||||
"sort": hit["sort"],
|
||||
"highlight": hit["highlight"],
|
||||
}
|
||||
|
||||
if hit["_source"] != nil {
|
||||
source, ok := hit["_source"].(map[string]interface{})
|
||||
if ok {
|
||||
for k, v := range source {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if hit["fields"] != nil {
|
||||
source, ok := hit["fields"].(map[string]interface{})
|
||||
if ok {
|
||||
for k, v := range source {
|
||||
doc[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
docs[hitIdx] = doc
|
||||
}
|
||||
|
||||
fieldVector := make([]*json.RawMessage, len(res.Hits.Hits))
|
||||
for i, doc := range docs {
|
||||
bytes, err := json.Marshal(doc)
|
||||
if err != nil {
|
||||
// We skip docs that can't be marshalled
|
||||
// should not happen
|
||||
continue
|
||||
}
|
||||
value := json.RawMessage(bytes)
|
||||
fieldVector[i] = &value
|
||||
}
|
||||
|
||||
isFilterable := true
|
||||
field := data.NewField(target.RefID, nil, fieldVector)
|
||||
field.Config = &data.FieldConfig{Filterable: &isFilterable}
|
||||
|
||||
frames := data.Frames{}
|
||||
frame := data.NewFrame(target.RefID, field)
|
||||
frames = append(frames, frame)
|
||||
|
||||
queryRes.Frames = frames
|
||||
return nil
|
||||
}
|
||||
|
||||
func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []string, configuredFields es.ConfiguredFields) []*data.Field {
|
||||
size := len(docs)
|
||||
isFilterable := true
|
||||
|
@ -1164,6 +1164,92 @@ func TestResponseParser(t *testing.T) {
|
||||
require.Equal(t, data.FieldTypeNullableString, frame.Fields[15].Type())
|
||||
})
|
||||
|
||||
t.Run("Raw document query", func(t *testing.T) {
|
||||
targets := map[string]string{
|
||||
"A": `{
|
||||
"metrics": [{ "type": "raw_document" }]
|
||||
}`,
|
||||
}
|
||||
|
||||
response := `{
|
||||
"responses":[
|
||||
{
|
||||
"hits":{
|
||||
"total":{
|
||||
"value":109,
|
||||
"relation":"eq"
|
||||
},
|
||||
"max_score":null,
|
||||
"hits":[
|
||||
{
|
||||
"_index":"logs-2023.02.08",
|
||||
"_id":"GB2UMYYBfCQ-FCMjayJa",
|
||||
"_score":null,
|
||||
"fields": {
|
||||
"test_field":"A"
|
||||
},
|
||||
"_source":{
|
||||
"@timestamp":"2023-02-08T15:10:55.830Z",
|
||||
"line":"log text [479231733]",
|
||||
"counter":"109",
|
||||
"float":58.253758485091,
|
||||
"label":"val1",
|
||||
"level":"info",
|
||||
"location":"17.089705232090438, 41.62861966340297",
|
||||
"nested": {
|
||||
"field": {
|
||||
"double_nested": "value"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"_index":"logs-2023.02.08",
|
||||
"_id":"Fx2UMYYBfCQ-FCMjZyJ_",
|
||||
"_score":null,
|
||||
"fields": {
|
||||
"test_field":"A"
|
||||
},
|
||||
"_source":{
|
||||
"@timestamp":"2023-02-08T15:10:54.835Z",
|
||||
"line":"log text with ANSI \u001b[31mpart of the text\u001b[0m [493139080]",
|
||||
"counter":"108",
|
||||
"float":54.5977098233944,
|
||||
"label":"val1",
|
||||
"level":"info",
|
||||
"location":"19.766305918490463, 40.42639175509792",
|
||||
"nested": {
|
||||
"field": {
|
||||
"double_nested": "value1"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"status":200
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
queryRes := result.Responses["A"]
|
||||
require.NotNil(t, queryRes)
|
||||
dataframes := queryRes.Frames
|
||||
require.Len(t, dataframes, 1)
|
||||
frame := dataframes[0]
|
||||
|
||||
require.Equal(t, 1, len(frame.Fields))
|
||||
//Fields have the correct length
|
||||
require.Equal(t, 2, frame.Fields[0].Len())
|
||||
// The only field is the raw document
|
||||
require.Equal(t, data.FieldTypeNullableJSON, frame.Fields[0].Type())
|
||||
require.Equal(t, "A", frame.Fields[0].Name)
|
||||
})
|
||||
|
||||
t.Run("Raw data query", func(t *testing.T) {
|
||||
targets := map[string]string{
|
||||
"A": `{
|
||||
|
@ -312,7 +312,15 @@ func isLogsQuery(query *Query) bool {
|
||||
}
|
||||
|
||||
func isDocumentQuery(query *Query) bool {
|
||||
return query.Metrics[0].Type == rawDataType || query.Metrics[0].Type == rawDocumentType
|
||||
return isRawDataQuery(query) || isRawDocumentQuery(query)
|
||||
}
|
||||
|
||||
func isRawDataQuery(query *Query) bool {
|
||||
return query.Metrics[0].Type == rawDataType
|
||||
}
|
||||
|
||||
func isRawDocumentQuery(query *Query) bool {
|
||||
return query.Metrics[0].Type == rawDocumentType
|
||||
}
|
||||
|
||||
func processLogsQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) {
|
||||
|
Loading…
Reference in New Issue
Block a user