diff --git a/pkg/tsdb/elasticsearch/client/search_request.go b/pkg/tsdb/elasticsearch/client/search_request.go index 939e2ea542a..f12638eef78 100644 --- a/pkg/tsdb/elasticsearch/client/search_request.go +++ b/pkg/tsdb/elasticsearch/client/search_request.go @@ -92,6 +92,19 @@ func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuil return b } +// Add highlights to the search request for log queries +func (b *SearchRequestBuilder) AddHighlight() *SearchRequestBuilder { + b.customProps["highlight"] = map[string]interface{}{ + "fields": map[string]interface{}{ + "*": map[string]interface{}{}, + }, + "pre_tags": []string{"@HIGHLIGHT@"}, + "post_tags": []string{"@/HIGHLIGHT@"}, + "fragment_size": 2147483647, + } + return b +} + // Query creates and return a query builder func (b *SearchRequestBuilder) Query() *QueryBuilder { if b.queryBuilder == nil { diff --git a/pkg/tsdb/elasticsearch/models.go b/pkg/tsdb/elasticsearch/models.go index 967f8ee358e..2740e1def1d 100644 --- a/pkg/tsdb/elasticsearch/models.go +++ b/pkg/tsdb/elasticsearch/models.go @@ -56,6 +56,7 @@ var metricAggType = map[string]string{ "raw_document": "Raw Document", "raw_data": "Raw Data", "rate": "Rate", + "logs": "Logs", } var extendedStats = map[string]string{ diff --git a/pkg/tsdb/elasticsearch/snapshot_test.go b/pkg/tsdb/elasticsearch/snapshot_test.go index ea6fd3c5f9f..0eaec69ab3e 100644 --- a/pkg/tsdb/elasticsearch/snapshot_test.go +++ b/pkg/tsdb/elasticsearch/snapshot_test.go @@ -74,8 +74,9 @@ func TestRequestSnapshots(t *testing.T) { {name: "simple metric test", path: "metric_simple"}, {name: "complex metric test", path: "metric_complex"}, {name: "multi metric test", path: "metric_multi"}, - {name: "raw data", path: "raw_data"}, - {name: "raw document", path: "raw_document"}, + {name: "raw data test", path: "raw_data"}, + {name: "raw document test", path: "raw_document"}, + {name: "logs test", path: "logs"}, } queryHeader := []byte(` diff --git a/pkg/tsdb/elasticsearch/testdata_request/logs.queries.json b/pkg/tsdb/elasticsearch/testdata_request/logs.queries.json new file mode 100644 index 00000000000..781bf4309a8 --- /dev/null +++ b/pkg/tsdb/elasticsearch/testdata_request/logs.queries.json @@ -0,0 +1,23 @@ +[ + { + "metrics": [ + { + "id": "1", + "type": "logs" + } + ], + "query": "", + "refId": "A", + "datasource": { + "type": "elasticsearch", + "uid": "PE50363A9B6833EE7" + }, + "alias": "", + "bucketAggs": [], + "timeField": "testtime", + "key": "Q-ee8fea91-a4c4-4ded-9827-b362476a4083-0", + "datasourceId": 39, + "intervalMs": 2000, + "maxDataPoints": 1318 + } +] \ No newline at end of file diff --git a/pkg/tsdb/elasticsearch/testdata_request/logs.request.line1.json b/pkg/tsdb/elasticsearch/testdata_request/logs.request.line1.json new file mode 100644 index 00000000000..0e105e19d31 --- /dev/null +++ b/pkg/tsdb/elasticsearch/testdata_request/logs.request.line1.json @@ -0,0 +1,59 @@ +{ + "docvalue_fields": [ + "testtime" + ], + "query": { + "bool": { + "filter": { + "range": { + "testtime": { + "format": "epoch_millis", + "gte": 1668422437218, + "lte": 1668422625668 + } + } + } + } + }, + "script_fields": {}, + "size": 500, + "sort": + { + "testtime": { + "order": "desc", + "unmapped_type": "boolean" + }, + "_doc": { + "order": "desc" + } + }, + "aggs": + { + "1": { + "date_histogram": { + "field": "testtime", + "fixed_interval": "1000ms", + "format": "epoch_millis", + "min_doc_count": 0, + "extended_bounds": { + "min": 1668422437218, + "max": 1668422625668 + } + } + } + }, + "highlight": + { + "pre_tags": [ + "@HIGHLIGHT@" + ], + "post_tags": [ + "@/HIGHLIGHT@" + ], + "fragment_size": 2147483647, + "fields": { + "*": {} + } + + } +} \ No newline at end of file diff --git a/pkg/tsdb/elasticsearch/time_series_query.go b/pkg/tsdb/elasticsearch/time_series_query.go index 41a1d833a30..33437452946 100644 --- a/pkg/tsdb/elasticsearch/time_series_query.go +++ b/pkg/tsdb/elasticsearch/time_series_query.go @@ -72,23 +72,45 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde b.Size(0) filters := b.Query().Bool().Filter() filters.AddDateRangeFilter(e.client.GetTimeField(), to, from, es.DateFormatEpochMS) - - if q.RawQuery != "" { - filters.AddQueryStringFilter(q.RawQuery, true) - } + filters.AddQueryStringFilter(q.RawQuery, true) if len(q.BucketAggs) == 0 { - if len(q.Metrics) == 0 || !(q.Metrics[0].Type == "raw_document" || q.Metrics[0].Type == "raw_data") { + // If no aggregations, only document and logs queries are valid + if len(q.Metrics) == 0 || !(q.Metrics[0].Type == "raw_data" || q.Metrics[0].Type == "raw_document" || q.Metrics[0].Type == "logs") { result.Responses[q.RefID] = backend.DataResponse{ Error: fmt.Errorf("invalid query, missing metrics and aggregations"), } return nil } + + // Defaults for log and document queries metric := q.Metrics[0] - b.Size(metric.Settings.Get("size").MustInt(500)) b.SortDesc(e.client.GetTimeField(), "boolean") b.SortDesc("_doc", "") b.AddDocValueField(e.client.GetTimeField()) + b.Size(metric.Settings.Get("size").MustInt(500)) + + if metric.Type == "logs" { + // Add additional defaults for log query + b.Size(metric.Settings.Get("limit").MustInt(500)) + b.AddHighlight() + + // For log query, we add a date histogram aggregation + aggBuilder := b.Agg() + q.BucketAggs = append(q.BucketAggs, &BucketAgg{ + Type: dateHistType, + Field: e.client.GetTimeField(), + ID: "1", + Settings: simplejson.NewFromAny(map[string]interface{}{ + "interval": "auto", + }), + }) + bucketAgg := q.BucketAggs[0] + bucketAgg.Settings = simplejson.NewFromAny( + bucketAgg.generateSettingsForDSL(), + ) + _ = addDateHistogramAgg(aggBuilder, bucketAgg, from, to) + } return nil } @@ -115,6 +137,7 @@ func (e *timeSeriesQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilde for _, m := range q.Metrics { m := m + if m.Type == countType { continue } diff --git a/pkg/tsdb/elasticsearch/time_series_query_test.go b/pkg/tsdb/elasticsearch/time_series_query_test.go index 9125248cb61..c361fb2fab2 100644 --- a/pkg/tsdb/elasticsearch/time_series_query_test.go +++ b/pkg/tsdb/elasticsearch/time_series_query_test.go @@ -1353,39 +1353,67 @@ func TestExecuteTimeSeriesQuery(t *testing.T) { require.Equal(t, filter.AnalyzeWildcard, true) }) - // FIXME - // Log query is not implemented with defaults - // t.Run("With log query should return query with defaults (from frontend tests)", func(t *testing.T) { - // c := newFakeClient() - // _, err := executeTsdbQuery(c, `{ - // "timeField": "@timestamp", - // "metrics": { "type": "logs", "id": "1"} - // }`, from, to, 15*time.Second) - // require.NoError(t, err) - // sr := c.multisearchRequests[0].Requests[0] - // require.Equal(t, sr.Size, 500) + t.Run("With log query should return query with defaults (from frontend tests)", func(t *testing.T) { + c := newFakeClient() + _, err := executeTsdbQuery(c, `{ + "timeField": "@timestamp", + "metrics": [{ "type": "logs", "id": "1"}] + }`, from, to, 15*time.Second) + require.NoError(t, err) + sr := c.multisearchRequests[0].Requests[0] + require.Equal(t, sr.Size, 500) - // rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter) - // require.Equal(t, rangeFilter.Key, c.timeField) - // require.Equal(t, rangeFilter.Lte, toMs) - // require.Equal(t, rangeFilter.Gte, fromMs) - // require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS) + rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter) + require.Equal(t, rangeFilter.Key, c.timeField) + require.Equal(t, rangeFilter.Lte, toMs) + require.Equal(t, rangeFilter.Gte, fromMs) + require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS) - // sort, _ := json.Marshal(sr.Sort) - // require.Equal(t, string(sort), `"sort":[{"@timestamp":{"order":"desc","unmapped_type":"boolean"}},{"_doc":{"order":"desc"}}]`) + require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "desc", "unmapped_type": "boolean"}) + require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "desc"}) + require.Equal(t, sr.CustomProps["script_fields"], map[string]interface{}{}) - // firstLevel := sr.Aggs[0] - // require.Equal(t, firstLevel.Key, "1") - // require.Equal(t, firstLevel.Aggregation.Type, "date_histogram") + firstLevel := sr.Aggs[0] + require.Equal(t, firstLevel.Key, "1") + require.Equal(t, firstLevel.Aggregation.Type, "date_histogram") - // hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg) - // require.Equal(t, hAgg.ExtendedBounds.Max, toMs) - // require.Equal(t, hAgg.ExtendedBounds.Min, fromMs) - // require.Equal(t, hAgg.Field, "@timestamp") - // require.Equal(t, hAgg.Format, es.DateFormatEpochMS) - // require.Equal(t, hAgg.FixedInterval, "$__interval_msms") - // require.Equal(t, hAgg.MinDocCount, 0) - // }) + hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg) + require.Equal(t, hAgg.ExtendedBounds.Max, toMs) + require.Equal(t, hAgg.ExtendedBounds.Min, fromMs) + require.Equal(t, hAgg.Field, "@timestamp") + require.Equal(t, hAgg.Format, es.DateFormatEpochMS) + require.Equal(t, hAgg.FixedInterval, "$__interval_msms") + require.Equal(t, hAgg.MinDocCount, 0) + }) + + t.Run("With log query with limit should return query with correct size", func(t *testing.T) { + c := newFakeClient() + _, err := executeTsdbQuery(c, `{ + "timeField": "@timestamp", + "metrics": [{ "type": "logs", "id": "1", "settings": { "limit": 1000 }}] + }`, from, to, 15*time.Second) + require.NoError(t, err) + sr := c.multisearchRequests[0].Requests[0] + require.Equal(t, sr.Size, 1000) + }) + + t.Run("With log query should return highlight properties", func(t *testing.T) { + c := newFakeClient() + _, err := executeTsdbQuery(c, `{ + "timeField": "@timestamp", + "metrics": [{ "type": "logs", "id": "1" }] + }`, from, to, 15*time.Second) + require.NoError(t, err) + sr := c.multisearchRequests[0].Requests[0] + require.Equal(t, sr.CustomProps["highlight"], map[string]interface{}{ + "fields": map[string]interface{}{ + "*": map[string]interface{}{}, + }, + "fragment_size": 2147483647, + "post_tags": []string{"@/HIGHLIGHT@"}, + "pre_tags": []string{"@HIGHLIGHT@"}, + }) + }) }) }