Elasticsearch: Add query building for log queries (#60182)

* Elasticsearch: Fix ordering in raw_document and add logic for raw_data

* Add comments

* Fix raw data request to use correct timefield

* Fix linting

* Add raw data as metric type

* Fix linting

* Elasticsearch: Add defaults for log query

* Add higlight

* Fix lint

* Add snapshot test

* Implement correct query for logs

* Update

* Adjust naming and comments

* Fix lint

* Remove ifs
This commit is contained in:
Ivana Huckova 2022-12-14 13:56:09 +01:00 committed by GitHub
parent 3188a8288e
commit d3ef86bd90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 185 additions and 37 deletions

View File

@ -92,6 +92,19 @@ func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuil
return b
}
// Add highlights to the search request for log queries
func (b *SearchRequestBuilder) AddHighlight() *SearchRequestBuilder {
b.customProps["highlight"] = map[string]interface{}{
"fields": map[string]interface{}{
"*": map[string]interface{}{},
},
"pre_tags": []string{"@HIGHLIGHT@"},
"post_tags": []string{"@/HIGHLIGHT@"},
"fragment_size": 2147483647,
}
return b
}
// Query creates and return a query builder
func (b *SearchRequestBuilder) Query() *QueryBuilder {
if b.queryBuilder == nil {

View File

@ -56,6 +56,7 @@ var metricAggType = map[string]string{
"raw_document": "Raw Document",
"raw_data": "Raw Data",
"rate": "Rate",
"logs": "Logs",
}
var extendedStats = map[string]string{

View File

@ -74,8 +74,9 @@ func TestRequestSnapshots(t *testing.T) {
{name: "simple metric test", path: "metric_simple"},
{name: "complex metric test", path: "metric_complex"},
{name: "multi metric test", path: "metric_multi"},
{name: "raw data", path: "raw_data"},
{name: "raw document", path: "raw_document"},
{name: "raw data test", path: "raw_data"},
{name: "raw document test", path: "raw_document"},
{name: "logs test", path: "logs"},
}
queryHeader := []byte(`

View File

@ -0,0 +1,23 @@
[
{
"metrics": [
{
"id": "1",
"type": "logs"
}
],
"query": "",
"refId": "A",
"datasource": {
"type": "elasticsearch",
"uid": "PE50363A9B6833EE7"
},
"alias": "",
"bucketAggs": [],
"timeField": "testtime",
"key": "Q-ee8fea91-a4c4-4ded-9827-b362476a4083-0",
"datasourceId": 39,
"intervalMs": 2000,
"maxDataPoints": 1318
}
]

View File

@ -0,0 +1,59 @@
{
"docvalue_fields": [
"testtime"
],
"query": {
"bool": {
"filter": {
"range": {
"testtime": {
"format": "epoch_millis",
"gte": 1668422437218,
"lte": 1668422625668
}
}
}
}
},
"script_fields": {},
"size": 500,
"sort":
{
"testtime": {
"order": "desc",
"unmapped_type": "boolean"
},
"_doc": {
"order": "desc"
}
},
"aggs":
{
"1": {
"date_histogram": {
"field": "testtime",
"fixed_interval": "1000ms",
"format": "epoch_millis",
"min_doc_count": 0,
"extended_bounds": {
"min": 1668422437218,
"max": 1668422625668
}
}
}
},
"highlight":
{
"pre_tags": [
"@HIGHLIGHT@"
],
"post_tags": [
"@/HIGHLIGHT@"
],
"fragment_size": 2147483647,
"fields": {
"*": {}
}
}
}

View File

@ -72,23 +72,45 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
b.Size(0)
filters := b.Query().Bool().Filter()
filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS)
if q.RawQuery != "" {
filters.AddQueryStringFilter(q.RawQuery, true)
}
filters.AddQueryStringFilter(q.RawQuery, true)
if len(q.BucketAggs) == 0 {
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == "raw_document" || q.Metrics[0].Type == "raw_data") {
// If no aggregations, only document and logs queries are valid
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == "raw_data" || q.Metrics[0].Type == "raw_document" || q.Metrics[0].Type == "logs") {
result.Responses[q.RefID] = backend.DataResponse{
Error: fmt.Errorf("invalid query, missing metrics and aggregations"),
}
return nil
}
// Defaults for log and document queries
metric := q.Metrics[0]
b.Size(metric.Settings.Get("size").MustInt(500))
b.SortDesc(e.client.GetTimeField(), "boolean")
b.SortDesc("_doc", "")
b.AddDocValueField(e.client.GetTimeField())
b.Size(metric.Settings.Get("size").MustInt(500))
if metric.Type == "logs" {
// Add additional defaults for log query
b.Size(metric.Settings.Get("limit").MustInt(500))
b.AddHighlight()
// For log query, we add a date histogram aggregation
aggBuilder := b.Agg()
q.BucketAggs = append(q.BucketAggs, &BucketAgg{
Type: dateHistType,
Field: e.client.GetTimeField(),
ID: "1",
Settings: simplejson.NewFromAny(map[string]interface{}{
"interval": "auto",
}),
})
bucketAgg := q.BucketAggs[0]
bucketAgg.Settings = simplejson.NewFromAny(
bucketAgg.generateSettingsForDSL(),
)
_ = addDateHistogramAgg(aggBuilder, bucketAgg, from, to)
}
return nil
}
@ -115,6 +137,7 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde
for _, m := range q.Metrics {
m := m
if m.Type == countType {
continue
}

View File

@ -1353,39 +1353,67 @@ func TestExecuteTimeSeriesQuery(t *testing.T) {
require.Equal(t, filter.AnalyzeWildcard, true)
})
// FIXME
// Log query is not implemented with defaults
// t.Run("With log query should return query with defaults (from frontend tests)", func(t *testing.T) {
// c := newFakeClient()
// _, err := executeTsdbQuery(c, `{
// "timeField": "@timestamp",
// "metrics": { "type": "logs", "id": "1"}
// }`, from, to, 15*time.Second)
// require.NoError(t, err)
// sr := c.multisearchRequests[0].Requests[0]
// require.Equal(t, sr.Size, 500)
t.Run("With log query should return query with defaults (from frontend tests)", func(t *testing.T) {
c := newFakeClient()
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"metrics": [{ "type": "logs", "id": "1"}]
}`, from, to, 15*time.Second)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.Size, 500)
// rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
// require.Equal(t, rangeFilter.Key, c.timeField)
// require.Equal(t, rangeFilter.Lte, toMs)
// require.Equal(t, rangeFilter.Gte, fromMs)
// require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS)
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
require.Equal(t, rangeFilter.Key, c.timeField)
require.Equal(t, rangeFilter.Lte, toMs)
require.Equal(t, rangeFilter.Gte, fromMs)
require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS)
// sort, _ := json.Marshal(sr.Sort)
// require.Equal(t, string(sort), `"sort":[{"@timestamp":{"order":"desc","unmapped_type":"boolean"}},{"_doc":{"order":"desc"}}]`)
require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "desc", "unmapped_type": "boolean"})
require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "desc"})
require.Equal(t, sr.CustomProps["script_fields"], map[string]interface{}{})
// firstLevel := sr.Aggs[0]
// require.Equal(t, firstLevel.Key, "1")
// require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
firstLevel := sr.Aggs[0]
require.Equal(t, firstLevel.Key, "1")
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
// hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
// require.Equal(t, hAgg.ExtendedBounds.Max, toMs)
// require.Equal(t, hAgg.ExtendedBounds.Min, fromMs)
// require.Equal(t, hAgg.Field, "@timestamp")
// require.Equal(t, hAgg.Format, es.DateFormatEpochMS)
// require.Equal(t, hAgg.FixedInterval, "$__interval_msms")
// require.Equal(t, hAgg.MinDocCount, 0)
// })
hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
require.Equal(t, hAgg.ExtendedBounds.Max, toMs)
require.Equal(t, hAgg.ExtendedBounds.Min, fromMs)
require.Equal(t, hAgg.Field, "@timestamp")
require.Equal(t, hAgg.Format, es.DateFormatEpochMS)
require.Equal(t, hAgg.FixedInterval, "$__interval_msms")
require.Equal(t, hAgg.MinDocCount, 0)
})
t.Run("With log query with limit should return query with correct size", func(t *testing.T) {
c := newFakeClient()
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"metrics": [{ "type": "logs", "id": "1", "settings": { "limit": 1000 }}]
}`, from, to, 15*time.Second)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.Size, 1000)
})
t.Run("With log query should return highlight properties", func(t *testing.T) {
c := newFakeClient()
_, err := executeTsdbQuery(c, `{
"timeField": "@timestamp",
"metrics": [{ "type": "logs", "id": "1" }]
}`, from, to, 15*time.Second)
require.NoError(t, err)
sr := c.multisearchRequests[0].Requests[0]
require.Equal(t, sr.CustomProps["highlight"], map[string]interface{}{
"fields": map[string]interface{}{
"*": map[string]interface{}{},
},
"fragment_size": 2147483647,
"post_tags": []string{"@/HIGHLIGHT@"},
"pre_tags": []string{"@HIGHLIGHT@"},
})
})
})
}