mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
* WIP * Refactor, plus update source of error in response_parser * Adjust test * Use methods and httpclient from errorsource * Update pkg/tsdb/elasticsearch/data_query.go Co-authored-by: Scott Lepper <scott.lepper@gmail.com> * Return nil error * Fix test * Fix integration test --------- Co-authored-by: Scott Lepper <scott.lepper@gmail.com>
1868 lines
59 KiB
Go
1868 lines
59 KiB
Go
package elasticsearch
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
|
)
|
|
|
|
func TestExecuteElasticsearchDataQuery(t *testing.T) {
|
|
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
|
|
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
|
|
fromMs := from.UnixNano() / int64(time.Millisecond)
|
|
toMs := to.UnixNano() / int64(time.Millisecond)
|
|
|
|
t.Run("Test execute time series query", func(t *testing.T) {
|
|
t.Run("With defaults", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "2" }],
|
|
"metrics": [{"type": "count", "id": "0" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
|
|
require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField)
|
|
require.Equal(t, rangeFilter.Lte, toMs)
|
|
require.Equal(t, rangeFilter.Gte, fromMs)
|
|
require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS)
|
|
require.Equal(t, sr.Aggs[0].Key, "2")
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
require.Equal(t, dateHistogramAgg.Field, "@timestamp")
|
|
require.Equal(t, dateHistogramAgg.ExtendedBounds.Min, fromMs)
|
|
require.Equal(t, dateHistogramAgg.ExtendedBounds.Max, toMs)
|
|
})
|
|
t.Run("Should clean settings from null values (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [{ "type": "date_histogram", "field": "@timestamp", "id": "1" }],
|
|
"metrics": [{"type": "avg", "id": "0", "settings": {"missing": "null", "script": "1" } }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
secondLevel := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Settings["script"], "1")
|
|
require.NotContains(t, secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Settings, "missing")
|
|
})
|
|
|
|
t.Run("With multiple bucket aggs", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "terms", "field": "@host", "id": "2", "settings": { "size": "0", "order": "asc" } },
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
|
|
require.Equal(t, termsAgg.Field, "@host")
|
|
require.Equal(t, termsAgg.Size, defaultSize)
|
|
secondLevel := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, secondLevel.Key, "3")
|
|
require.Equal(t, secondLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, "@timestamp")
|
|
})
|
|
|
|
t.Run("With select field", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
|
|
],
|
|
"metrics": [{"type": "avg", "field": "@value", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
require.Equal(t, firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, "@timestamp")
|
|
secondLevel := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, secondLevel.Key, "1")
|
|
require.Equal(t, secondLevel.Aggregation.Type, "avg")
|
|
require.Equal(t, secondLevel.Aggregation.Aggregation.(*es.MetricAggregation).Field, "@value")
|
|
})
|
|
|
|
t.Run("With term agg and order by term (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "_term" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "count", "id": "1" },
|
|
{"type": "avg", "field": "@value", "id": "5" }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Aggregation.Aggregation.(*es.TermsAggregation).Order["_key"], "asc")
|
|
})
|
|
|
|
t.Run("With term agg and order by metric agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "5" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "count", "id": "1" },
|
|
{"type": "avg", "field": "@value", "id": "5" }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
avgAggOrderBy := sr.Aggs[0].Aggregation.Aggs[0]
|
|
require.Equal(t, avgAggOrderBy.Key, "5")
|
|
require.Equal(t, avgAggOrderBy.Aggregation.Type, "avg")
|
|
require.Equal(t, avgAggOrderBy.Aggregation.Aggregation.(*es.MetricAggregation).Field, "@value")
|
|
|
|
avgAgg := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggs[0]
|
|
require.Equal(t, avgAgg.Key, "5")
|
|
require.Equal(t, avgAgg.Aggregation.Type, "avg")
|
|
require.Equal(t, avgAgg.Aggregation.Aggregation.(*es.MetricAggregation).Field, "@value")
|
|
})
|
|
|
|
t.Run("With term agg and order by count metric agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "1" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "count", "id": "1" }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
termsAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.TermsAggregation)
|
|
require.Equal(t, termsAgg.Order["_count"], "asc")
|
|
})
|
|
|
|
t.Run("With term agg and order by count agg (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [
|
|
{"type": "count", "id": "1" },
|
|
{"type": "avg", "field": "@value", "id": "5" }
|
|
],
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "1" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
|
|
|
|
require.Equal(t, termsAgg.Order["_count"], "asc")
|
|
require.NotEqual(t, firstLevel.Aggregation.Aggs[0].Key, "1")
|
|
})
|
|
|
|
t.Run("With term agg and order by percentiles agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "1[95.0]" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "percentiles", "field": "@value", "id": "1", "settings": { "percents": ["95","99"] } }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
orderByAgg := sr.Aggs[0].Aggregation.Aggs[0]
|
|
secondLevel := orderByAgg.Aggregation.Aggregation
|
|
|
|
require.Equal(t, orderByAgg.Key, "1")
|
|
require.Equal(t, orderByAgg.Aggregation.Type, "percentiles")
|
|
require.Equal(t, orderByAgg.Aggregation.Aggregation.(*es.MetricAggregation).Field, "@value")
|
|
require.Equal(t, secondLevel.(*es.MetricAggregation).Field, "@value")
|
|
})
|
|
|
|
t.Run("With term agg and order by extended stats agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "1[std_deviation]" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "extended_stats", "field": "@value", "id": "1", "meta": { "std_deviation": true } }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
orderByAgg := firstLevel.Aggregation.Aggs[0]
|
|
secondLevel := orderByAgg.Aggregation.Aggregation
|
|
|
|
require.Equal(t, orderByAgg.Key, "1")
|
|
require.Equal(t, orderByAgg.Aggregation.Type, "extended_stats")
|
|
require.Equal(t, orderByAgg.Aggregation.Aggregation.(*es.MetricAggregation).Field, "@value")
|
|
require.Equal(t, secondLevel.(*es.MetricAggregation).Field, "@value")
|
|
})
|
|
|
|
t.Run("With term agg and order by term", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "size": "5", "order": "asc", "orderBy": "_term" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "count", "id": "1" },
|
|
{"type": "avg", "field": "@value", "id": "5" }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
|
|
require.Equal(t, termsAgg.Order["_key"], "asc")
|
|
})
|
|
|
|
t.Run("With term agg and valid min_doc_count (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "terms",
|
|
"field": "@host",
|
|
"id": "2",
|
|
"settings": { "min_doc_count": "1" }
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{"type": "count", "id": "1" }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
termsAgg := firstLevel.Aggregation.Aggregation.(*es.TermsAggregation)
|
|
expectedMinDocCount := 1
|
|
require.Equal(t, termsAgg.MinDocCount, &expectedMinDocCount)
|
|
})
|
|
|
|
t.Run("With metric percentiles", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{
|
|
"id": "1",
|
|
"type": "percentiles",
|
|
"field": "@load_time",
|
|
"settings": {
|
|
"percents": [ "1", "2", "3", "4" ]
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
percentilesAgg := sr.Aggs[0].Aggregation.Aggs[0]
|
|
require.Equal(t, percentilesAgg.Key, "1")
|
|
require.Equal(t, percentilesAgg.Aggregation.Type, "percentiles")
|
|
metricAgg := percentilesAgg.Aggregation.Aggregation.(*es.MetricAggregation)
|
|
require.Equal(t, metricAgg.Field, "@load_time")
|
|
percents := metricAgg.Settings["percents"].([]any)
|
|
require.Len(t, percents, 4)
|
|
require.Equal(t, percents[0], "1")
|
|
require.Equal(t, percents[1], "2")
|
|
require.Equal(t, percents[2], "3")
|
|
require.Equal(t, percents[3], "4")
|
|
})
|
|
|
|
t.Run("With filters aggs", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "2",
|
|
"type": "filters",
|
|
"settings": {
|
|
"filters": [ { "query": "@metric:cpu" }, { "query": "@metric:logins.count" } ]
|
|
}
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
filtersAgg := sr.Aggs[0]
|
|
require.Equal(t, filtersAgg.Key, "2")
|
|
require.Equal(t, filtersAgg.Aggregation.Type, "filters")
|
|
fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
|
|
require.Equal(t, fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, "@metric:cpu")
|
|
require.Equal(t, fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, "@metric:logins.count")
|
|
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
|
|
require.Equal(t, dateHistogramAgg.Key, "4")
|
|
require.Equal(t, dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, "@timestamp")
|
|
})
|
|
|
|
t.Run("With filters aggs and empty label (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "2",
|
|
"type": "filters",
|
|
"settings": {
|
|
"filters": [ { "query": "@metric:cpu", "label": "" }, { "query": "@metric:logins.count", "label": "" } ]
|
|
}
|
|
},
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
filtersAgg := sr.Aggs[0]
|
|
require.Equal(t, filtersAgg.Key, "2")
|
|
require.Equal(t, filtersAgg.Aggregation.Type, "filters")
|
|
fAgg := filtersAgg.Aggregation.Aggregation.(*es.FiltersAggregation)
|
|
require.Equal(t, fAgg.Filters["@metric:cpu"].(*es.QueryStringFilter).Query, "@metric:cpu")
|
|
require.Equal(t, fAgg.Filters["@metric:logins.count"].(*es.QueryStringFilter).Query, "@metric:logins.count")
|
|
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggs[0]
|
|
require.Equal(t, dateHistogramAgg.Key, "4")
|
|
require.Equal(t, dateHistogramAgg.Aggregation.Aggregation.(*es.DateHistogramAgg).Field, "@timestamp")
|
|
})
|
|
|
|
t.Run("With raw document metric size", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [],
|
|
"metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
require.Equal(t, sr.Size, defaultSize)
|
|
})
|
|
|
|
t.Run("With raw document metric query (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [],
|
|
"metrics": [{ "id": "1", "type": "raw_document", "settings": {} }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
|
|
require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField)
|
|
require.Equal(t, rangeFilter.Lte, toMs)
|
|
require.Equal(t, rangeFilter.Gte, fromMs)
|
|
require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS)
|
|
|
|
require.Equal(t, sr.Size, defaultSize)
|
|
require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "desc", "unmapped_type": "boolean"})
|
|
require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "desc"})
|
|
require.Equal(t, sr.CustomProps["script_fields"], map[string]any{})
|
|
})
|
|
|
|
t.Run("With raw data metric query (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [],
|
|
"metrics": [{ "id": "1", "type": "raw_data", "settings": {} }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
|
|
require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField)
|
|
require.Equal(t, rangeFilter.Lte, toMs)
|
|
require.Equal(t, rangeFilter.Gte, fromMs)
|
|
require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS)
|
|
|
|
require.Equal(t, sr.Size, defaultSize)
|
|
require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "desc", "unmapped_type": "boolean"})
|
|
require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "desc"})
|
|
require.Equal(t, sr.CustomProps["script_fields"], map[string]any{})
|
|
})
|
|
|
|
t.Run("With raw document metric size set", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [],
|
|
"metrics": [{ "id": "1", "type": "raw_document", "settings": { "size": "1337" } }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
require.Equal(t, sr.Size, 1337)
|
|
})
|
|
|
|
t.Run("With date histogram agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "2",
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"settings": { "interval": "auto", "min_doc_count": 2 }
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
require.Equal(t, hAgg.Field, "@timestamp")
|
|
require.Equal(t, hAgg.FixedInterval, "$__interval_msms")
|
|
require.Equal(t, hAgg.MinDocCount, 2)
|
|
|
|
t.Run("Should not include time_zone if not present in the query model (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "2",
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"settings": {
|
|
"min_doc_count": "1"
|
|
}
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogram := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
require.Empty(t, dateHistogram.TimeZone)
|
|
})
|
|
|
|
t.Run("Should not include time_zone when timeZone is utc", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "2",
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"settings": {
|
|
"timeZone": "utc"
|
|
}
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogram := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
require.Empty(t, dateHistogram.TimeZone)
|
|
})
|
|
|
|
t.Run("Should include time_zone when timeZone is not utc", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "2",
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"settings": {
|
|
"timeZone": "America/Los_Angeles"
|
|
}
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogram := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
require.Equal(t, dateHistogram.TimeZone, "America/Los_Angeles")
|
|
})
|
|
})
|
|
|
|
t.Run("With histogram agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "3",
|
|
"type": "histogram",
|
|
"field": "bytes",
|
|
"settings": { "interval": "10", "min_doc_count": 2, "missing": 5 }
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "histogram")
|
|
hAgg := firstLevel.Aggregation.Aggregation.(*es.HistogramAgg)
|
|
require.Equal(t, hAgg.Field, "bytes")
|
|
require.Equal(t, hAgg.Interval, 10)
|
|
require.Equal(t, hAgg.MinDocCount, 2)
|
|
require.Equal(t, *hAgg.Missing, 5)
|
|
})
|
|
|
|
t.Run("With histogram (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "3",
|
|
"type": "histogram",
|
|
"field": "bytes",
|
|
"settings": { "interval": "10", "min_doc_count": 2 }
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "histogram")
|
|
hAgg := firstLevel.Aggregation.Aggregation.(*es.HistogramAgg)
|
|
require.Equal(t, hAgg.Field, "bytes")
|
|
require.Equal(t, hAgg.Interval, 10)
|
|
require.Equal(t, hAgg.MinDocCount, 2)
|
|
})
|
|
|
|
t.Run("With geo hash grid agg", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "3",
|
|
"type": "geohash_grid",
|
|
"field": "@location",
|
|
"settings": { "precision": "6" }
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "geohash_grid")
|
|
ghGridAgg := firstLevel.Aggregation.Aggregation.(*es.GeoHashGridAggregation)
|
|
require.Equal(t, ghGridAgg.Field, "@location")
|
|
require.Equal(t, ghGridAgg.Precision, 6)
|
|
})
|
|
|
|
t.Run("With geo hash grid agg with invalid int precision", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "3",
|
|
"type": "geohash_grid",
|
|
"field": "@location",
|
|
"settings": { "precision": 7 }
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "geohash_grid")
|
|
ghGridAgg := firstLevel.Aggregation.Aggregation.(*es.GeoHashGridAggregation)
|
|
require.Equal(t, ghGridAgg.Field, "@location")
|
|
// It should default to 3
|
|
require.Equal(t, ghGridAgg.Precision, 3)
|
|
})
|
|
|
|
t.Run("With geo hash grid agg with no precision", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"id": "3",
|
|
"type": "geohash_grid",
|
|
"field": "@location",
|
|
"settings": {}
|
|
}
|
|
],
|
|
"metrics": [{"type": "count", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "geohash_grid")
|
|
ghGridAgg := firstLevel.Aggregation.Aggregation.(*es.GeoHashGridAggregation)
|
|
require.Equal(t, ghGridAgg.Field, "@location")
|
|
// It should default to 3
|
|
require.Equal(t, ghGridAgg.Precision, 3)
|
|
})
|
|
|
|
t.Run("With moving average (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "moving_avg",
|
|
"field": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, len(firstLevel.Aggregation.Aggs), 2)
|
|
|
|
sumAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, sumAgg.Key, "3")
|
|
require.Equal(t, sumAgg.Aggregation.Type, "sum")
|
|
mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
|
|
require.Equal(t, mAgg.Field, "@value")
|
|
|
|
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, movingAvgAgg.Key, "2")
|
|
require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
|
|
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, pl.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With moving average", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "moving_avg",
|
|
"field": "3",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 2)
|
|
|
|
sumAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, sumAgg.Key, "3")
|
|
require.Equal(t, sumAgg.Aggregation.Type, "sum")
|
|
mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
|
|
require.Equal(t, mAgg.Field, "@value")
|
|
|
|
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, movingAvgAgg.Key, "2")
|
|
require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
|
|
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, pl.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With moving average doc count (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count"},
|
|
{
|
|
"id": "2",
|
|
"type": "moving_avg",
|
|
"field": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 1)
|
|
|
|
movingAvgAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, movingAvgAgg.Key, "2")
|
|
require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
|
|
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, pl.BucketPath, "_count")
|
|
})
|
|
|
|
t.Run("With moving average doc count", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count", "field": "select field" },
|
|
{
|
|
"id": "2",
|
|
"type": "moving_avg",
|
|
"field": "3",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 1)
|
|
|
|
movingAvgAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, movingAvgAgg.Key, "2")
|
|
require.Equal(t, movingAvgAgg.Aggregation.Type, "moving_avg")
|
|
pl := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, pl.BucketPath, "_count")
|
|
})
|
|
|
|
t.Run("With broken moving average (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "moving_avg",
|
|
"field": "3"
|
|
},
|
|
{
|
|
"id": "4",
|
|
"type": "moving_avg"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 2)
|
|
|
|
sumAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, sumAgg.Key, "3")
|
|
|
|
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, movingAvgAgg.Key, "2")
|
|
plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With broken moving average", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "5" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "moving_avg",
|
|
"pipelineAgg": "3"
|
|
},
|
|
{
|
|
"id": "4",
|
|
"type": "moving_avg",
|
|
"pipelineAgg": "Metric to apply moving average"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "5")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 2)
|
|
|
|
movingAvgAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, movingAvgAgg.Key, "2")
|
|
plAgg := movingAvgAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With top_metrics (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "2", "type": "top_metrics", "settings": { "order": "desc", "orderBy": "@timestamp", "metrics": ["@value"]} }
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
|
|
secondLevel := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, secondLevel.Key, "2")
|
|
require.Equal(t, secondLevel.Aggregation.Type, "top_metrics")
|
|
|
|
topMetricsBytes, _ := json.Marshal(firstLevel.Aggregation.Aggs[0].Aggregation.Aggregation)
|
|
require.Equal(t, string(topMetricsBytes), `{"metrics":[{"field":"@value"}],"size":"1","sort":[{"@timestamp":"desc"}]}`)
|
|
})
|
|
|
|
t.Run("With cumulative sum", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "cumulative_sum",
|
|
"field": "3",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 2)
|
|
|
|
sumAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, sumAgg.Key, "3")
|
|
require.Equal(t, sumAgg.Aggregation.Type, "sum")
|
|
mAgg := sumAgg.Aggregation.Aggregation.(*es.MetricAggregation)
|
|
require.Equal(t, mAgg.Field, "@value")
|
|
|
|
cumulativeSumAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, cumulativeSumAgg.Key, "2")
|
|
require.Equal(t, cumulativeSumAgg.Aggregation.Type, "cumulative_sum")
|
|
pl := cumulativeSumAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, pl.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With cumulative sum doc count", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count", "field": "select field" },
|
|
{
|
|
"id": "2",
|
|
"type": "cumulative_sum",
|
|
"field": "3",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 1)
|
|
|
|
cumulativeSumAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, cumulativeSumAgg.Key, "2")
|
|
require.Equal(t, cumulativeSumAgg.Aggregation.Type, "cumulative_sum")
|
|
pl := cumulativeSumAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, pl.BucketPath, "_count")
|
|
})
|
|
|
|
t.Run("With broken cumulative sum", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "5" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "cumulative_sum",
|
|
"pipelineAgg": "3"
|
|
},
|
|
{
|
|
"id": "4",
|
|
"type": "cumulative_sum",
|
|
"pipelineAgg": "Metric to apply cumulative sum"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "5")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
require.Len(t, firstLevel.Aggregation.Aggs, 2)
|
|
|
|
cumulativeSumAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, cumulativeSumAgg.Key, "2")
|
|
plAgg := cumulativeSumAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With derivative", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "derivative",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
derivativeAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, derivativeAgg.Key, "2")
|
|
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "3")
|
|
})
|
|
|
|
t.Run("With derivative doc count", func(t *testing.T) {
|
|
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count", "field": "select field" },
|
|
{
|
|
"id": "2",
|
|
"type": "derivative",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
derivativeAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, derivativeAgg.Key, "2")
|
|
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "_count")
|
|
})
|
|
|
|
t.Run("With derivative doc count (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count" },
|
|
{
|
|
"id": "2",
|
|
"type": "derivative",
|
|
"field": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
derivativeAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, derivativeAgg.Key, "2")
|
|
plAgg := derivativeAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "_count")
|
|
})
|
|
|
|
t.Run("With serial_diff", func(t *testing.T) {
|
|
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "serial_diff",
|
|
"pipelineAgg": "3",
|
|
"settings": { "lag": "5" }
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
serialDiffAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, serialDiffAgg.Key, "2")
|
|
plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "3")
|
|
require.Equal(t, plAgg.Settings["lag"], 5.)
|
|
})
|
|
|
|
t.Run("With serial_diff (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "max", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "serial_diff",
|
|
"field": "3",
|
|
"settings": { "lag": "5" }
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "3")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
serialDiffAgg := firstLevel.Aggregation.Aggs[1]
|
|
require.Equal(t, serialDiffAgg.Key, "2")
|
|
plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "3")
|
|
require.Equal(t, plAgg.Settings["lag"], 5.0)
|
|
})
|
|
|
|
t.Run("With serial_diff doc count", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count", "field": "select field" },
|
|
{
|
|
"id": "2",
|
|
"type": "serial_diff",
|
|
"pipelineAgg": "3"
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
serialDiffAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, serialDiffAgg.Key, "2")
|
|
plAgg := serialDiffAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath, "_count")
|
|
})
|
|
|
|
t.Run("With bucket_script", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "sum", "field": "@value" },
|
|
{ "id": "3", "type": "max", "field": "@value" },
|
|
{
|
|
"id": "4",
|
|
"type": "bucket_script",
|
|
"pipelineVariables": [
|
|
{ "name": "var1", "pipelineAgg": "1" },
|
|
{ "name": "var2", "pipelineAgg": "3" }
|
|
],
|
|
"settings": { "script": "params.var1 * params.var2" }
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
bucketScriptAgg := firstLevel.Aggregation.Aggs[2]
|
|
require.Equal(t, bucketScriptAgg.Key, "4")
|
|
plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath.(map[string]any), map[string]any{
|
|
"var1": "1",
|
|
"var2": "3",
|
|
})
|
|
})
|
|
|
|
t.Run("With bucket_script (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "sum", "field": "@value" },
|
|
{ "id": "5", "type": "max", "field": "@value" },
|
|
{
|
|
"id": "2",
|
|
"type": "bucket_script",
|
|
"pipelineVariables": [
|
|
{ "name": "var1", "pipelineAgg": "3" },
|
|
{ "name": "var2", "pipelineAgg": "5" }
|
|
],
|
|
"settings": { "script": "params.var1 * params.var2" }
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
bucketScriptAgg := firstLevel.Aggregation.Aggs[2]
|
|
require.Equal(t, bucketScriptAgg.Key, "2")
|
|
plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath.(map[string]any), map[string]any{
|
|
"var1": "3",
|
|
"var2": "5",
|
|
})
|
|
})
|
|
|
|
t.Run("With bucket_script doc count", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "4" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count", "field": "select field" },
|
|
{
|
|
"id": "2",
|
|
"type": "bucket_script",
|
|
"pipelineVariables": [
|
|
{ "name": "var1", "pipelineAgg": "3" }
|
|
],
|
|
"settings": { "script": "params.var1 * 1000" }
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "4")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
bucketScriptAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, bucketScriptAgg.Key, "2")
|
|
plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath.(map[string]any), map[string]any{
|
|
"var1": "_count",
|
|
})
|
|
})
|
|
|
|
t.Run("With bucket_script doc count (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "3", "type": "count"},
|
|
{
|
|
"id": "4",
|
|
"type": "bucket_script",
|
|
"pipelineVariables": [
|
|
{ "name": "var1", "pipelineAgg": "3" }
|
|
],
|
|
"settings": { "script": "params.var1 * 1000" }
|
|
}
|
|
]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "2")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
bucketScriptAgg := firstLevel.Aggregation.Aggs[0]
|
|
require.Equal(t, bucketScriptAgg.Key, "4")
|
|
plAgg := bucketScriptAgg.Aggregation.Aggregation.(*es.PipelineAggregation)
|
|
require.Equal(t, plAgg.BucketPath.(map[string]any), map[string]any{
|
|
"var1": "_count",
|
|
})
|
|
})
|
|
|
|
t.Run("With lucene query should add query_string filter when query is not empty (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"query": "foo",
|
|
"bucketAggs": [],
|
|
"metrics": [{ "id": "1", "type": "raw_data", "settings": {} }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
filter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter)
|
|
require.Equal(t, filter.Query, "foo")
|
|
require.Equal(t, filter.AnalyzeWildcard, true)
|
|
})
|
|
|
|
t.Run("With lucene query should add query_string filter when query is not empty (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"query": "foo",
|
|
"bucketAggs": [],
|
|
"metrics": [{ "id": "1", "type": "raw_data", "settings": {} }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
filter := sr.Query.Bool.Filters[1].(*es.QueryStringFilter)
|
|
require.Equal(t, filter.Query, "foo")
|
|
require.Equal(t, filter.AnalyzeWildcard, true)
|
|
})
|
|
|
|
t.Run("With log query should return query with defaults (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "logs", "id": "1"}]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
require.Equal(t, sr.Size, defaultSize)
|
|
|
|
rangeFilter := sr.Query.Bool.Filters[0].(*es.RangeFilter)
|
|
require.Equal(t, rangeFilter.Key, c.configuredFields.TimeField)
|
|
require.Equal(t, rangeFilter.Lte, toMs)
|
|
require.Equal(t, rangeFilter.Gte, fromMs)
|
|
require.Equal(t, rangeFilter.Format, es.DateFormatEpochMS)
|
|
|
|
require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "desc", "unmapped_type": "boolean"})
|
|
require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "desc"})
|
|
require.Equal(t, sr.CustomProps["script_fields"], map[string]any{})
|
|
|
|
firstLevel := sr.Aggs[0]
|
|
require.Equal(t, firstLevel.Key, "1")
|
|
require.Equal(t, firstLevel.Aggregation.Type, "date_histogram")
|
|
|
|
hAgg := firstLevel.Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
require.Equal(t, hAgg.ExtendedBounds.Max, toMs)
|
|
require.Equal(t, hAgg.ExtendedBounds.Min, fromMs)
|
|
require.Equal(t, hAgg.Field, "@timestamp")
|
|
require.Equal(t, hAgg.Format, es.DateFormatEpochMS)
|
|
require.Equal(t, hAgg.FixedInterval, "$__interval_msms")
|
|
require.Equal(t, hAgg.MinDocCount, 0)
|
|
})
|
|
|
|
t.Run("With log query with limit should return query with correct size", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "logs", "id": "1", "settings": { "limit": "1000" }}]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
require.Equal(t, sr.Size, 1000)
|
|
})
|
|
|
|
t.Run("With log query should return highlight properties", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "logs", "id": "1" }]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
require.Equal(t, sr.CustomProps["highlight"], map[string]any{
|
|
"fields": map[string]any{
|
|
"*": map[string]any{},
|
|
},
|
|
"fragment_size": 2147483647,
|
|
"post_tags": []string{"@/HIGHLIGHT@"},
|
|
"pre_tags": []string{"@HIGHLIGHT@"},
|
|
})
|
|
})
|
|
|
|
t.Run("With log context query with sortDirection and searchAfter should return correct query", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "logs", "id": "1", "settings": { "limit": "1000", "sortDirection": "asc", "searchAfter": [1, "2"] }}]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
require.Equal(t, sr.Sort["@timestamp"], map[string]string{"order": "asc", "unmapped_type": "boolean"})
|
|
require.Equal(t, sr.Sort["_doc"], map[string]string{"order": "asc"})
|
|
|
|
searchAfter := sr.CustomProps["search_after"].([]any)
|
|
firstSearchAfter, err := searchAfter[0].(json.Number).Int64()
|
|
require.NoError(t, err)
|
|
require.Equal(t, firstSearchAfter, int64(1))
|
|
secondSearchAfter := searchAfter[1].(string)
|
|
require.NoError(t, err)
|
|
require.Equal(t, secondSearchAfter, "2")
|
|
})
|
|
|
|
t.Run("With invalid query should return error", (func(t *testing.T) {
|
|
c := newFakeClient()
|
|
res, err := executeElasticsearchDataQuery(c, `{
|
|
"query": "foo",
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
require.Equal(t, res.Responses["A"].ErrorSource, backend.ErrorSourcePlugin)
|
|
require.Equal(t, res.Responses["A"].Error.Error(), "invalid character '}' looking for beginning of object key string")
|
|
}))
|
|
})
|
|
}
|
|
|
|
func TestSettingsCasting(t *testing.T) {
|
|
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
|
|
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
|
|
|
|
t.Run("Correctly casts values in moving_avg (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [
|
|
{ "type": "avg", "id" : "2" },
|
|
{
|
|
"type": "moving_avg",
|
|
"id" : "3",
|
|
"pipelineAgg": "2",
|
|
"settings": {
|
|
"window": "5",
|
|
"model": "holt_winters",
|
|
"predict": "10",
|
|
"settings": {
|
|
"alpha": "1",
|
|
"beta": "2",
|
|
"gamma": "3",
|
|
"period": "4"
|
|
}
|
|
}
|
|
}
|
|
],
|
|
"bucketAggs": [{"type": "date_histogram", "field": "@timestamp", "id": "1"}]
|
|
}`, from, to)
|
|
require.NoError(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
|
|
|
|
assert.Equal(t, movingAvgSettings["window"], 5.0)
|
|
assert.Equal(t, movingAvgSettings["predict"], 10.0)
|
|
|
|
modelSettings := movingAvgSettings["settings"].(map[string]any)
|
|
|
|
assert.Equal(t, modelSettings["alpha"], 1.0)
|
|
assert.Equal(t, modelSettings["beta"], 2.0)
|
|
assert.Equal(t, modelSettings["gamma"], 3.0)
|
|
assert.Equal(t, modelSettings["period"], 4.0)
|
|
})
|
|
|
|
t.Run("Correctly transforms moving_average settings", func(t *testing.T) {
|
|
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "average", "field": "@value" },
|
|
{
|
|
"id": "3",
|
|
"type": "moving_avg",
|
|
"field": "1",
|
|
"pipelineAgg": "1",
|
|
"settings": {
|
|
"model": "holt_winters",
|
|
"window": "10",
|
|
"predict": "5",
|
|
"settings": {
|
|
"alpha": "0.5",
|
|
"beta": "0.7",
|
|
"gamma": "SHOULD NOT CHANGE",
|
|
"period": "4"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
movingAvgSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
|
|
|
|
assert.Equal(t, 10., movingAvgSettings["window"])
|
|
assert.Equal(t, 5., movingAvgSettings["predict"])
|
|
|
|
modelSettings := movingAvgSettings["settings"].(map[string]any)
|
|
|
|
assert.Equal(t, .5, modelSettings["alpha"])
|
|
assert.Equal(t, .7, modelSettings["beta"])
|
|
assert.Equal(t, "SHOULD NOT CHANGE", modelSettings["gamma"])
|
|
assert.Equal(t, 4., modelSettings["period"])
|
|
})
|
|
|
|
t.Run("Correctly transforms serial_diff settings (from frontend tests)", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "1" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "2", "type": "avg" },
|
|
{
|
|
"id": "3",
|
|
"type": "serial_diff",
|
|
"field": "2",
|
|
"settings": {
|
|
"lag": "1"
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
|
|
assert.Equal(t, serialDiffSettings["lag"], 1.)
|
|
})
|
|
|
|
t.Run("Correctly transforms serial_diff settings", func(t *testing.T) {
|
|
// This test is with pipelineAgg and is passing. Same test without pipelineAgg is failing.
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "average", "field": "@value" },
|
|
{
|
|
"id": "3",
|
|
"type": "serial_diff",
|
|
"field": "1",
|
|
"pipelineAgg": "1",
|
|
"settings": {
|
|
"lag": "1"
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
serialDiffSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.PipelineAggregation).Settings
|
|
|
|
assert.Equal(t, 1., serialDiffSettings["lag"])
|
|
})
|
|
|
|
t.Run("Date Histogram Settings", func(t *testing.T) {
|
|
t.Run("Correctly transforms date_histogram settings", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"id": "2",
|
|
"settings": {
|
|
"min_doc_count": "1"
|
|
}
|
|
}
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "average", "field": "@value" },
|
|
{
|
|
"id": "3",
|
|
"type": "serial_diff",
|
|
"field": "1",
|
|
"pipelineAgg": "1",
|
|
"settings": {
|
|
"lag": "1"
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
|
|
assert.Equal(t, 1, dateHistogramAgg.MinDocCount)
|
|
})
|
|
|
|
t.Run("Correctly uses already int min_doc_count", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"id": "2",
|
|
"settings": {
|
|
"min_doc_count": 10
|
|
}
|
|
}
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "average", "field": "@value" },
|
|
{
|
|
"id": "3",
|
|
"type": "serial_diff",
|
|
"field": "1",
|
|
"pipelineAgg": "1",
|
|
"settings": {
|
|
"lag": "1"
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
|
|
assert.Equal(t, 10, dateHistogramAgg.MinDocCount)
|
|
})
|
|
|
|
t.Run("interval parameter", func(t *testing.T) {
|
|
t.Run("Uses fixed_interval", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"id": "2",
|
|
"settings": {
|
|
"interval": "1d"
|
|
}
|
|
}
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "average", "field": "@value" }
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
|
|
assert.NotZero(t, dateHistogramAgg.FixedInterval)
|
|
})
|
|
|
|
t.Run("Uses calendar_interval", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{
|
|
"type": "date_histogram",
|
|
"field": "@timestamp",
|
|
"id": "2",
|
|
"settings": {
|
|
"interval": "1M"
|
|
}
|
|
}
|
|
],
|
|
"metrics": [
|
|
{ "id": "1", "type": "average", "field": "@value" }
|
|
]
|
|
}`, from, to)
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
|
|
assert.NotZero(t, dateHistogramAgg.CalendarInterval)
|
|
})
|
|
})
|
|
})
|
|
|
|
t.Run("Inline Script", func(t *testing.T) {
|
|
t.Run("Correctly handles scripts", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "field": "@timestamp", "id": "2" }
|
|
],
|
|
"metrics": [
|
|
{
|
|
"id": "1",
|
|
"type": "avg",
|
|
"settings": {
|
|
"script": "my_script"
|
|
}
|
|
},
|
|
{
|
|
"id": "3",
|
|
"type": "avg",
|
|
"settings": {
|
|
"script": {
|
|
"inline": "my_script"
|
|
}
|
|
}
|
|
}
|
|
]
|
|
}`, from, to)
|
|
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
|
|
newFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[0].Aggregation.Aggregation.(*es.MetricAggregation).Settings
|
|
oldFormatAggSettings := sr.Aggs[0].Aggregation.Aggs[1].Aggregation.Aggregation.(*es.MetricAggregation).Settings
|
|
|
|
assert.Equal(t, "my_script", newFormatAggSettings["script"])
|
|
assert.Equal(t, "my_script", oldFormatAggSettings["script"])
|
|
})
|
|
})
|
|
|
|
t.Run("Field property (from frontend tests)", func(t *testing.T) {
|
|
t.Run("Should use timeField from datasource when not specified", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "count", "id": "1" }],
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "id": "2", "settings": { "min_doc_count": "1" } }
|
|
]
|
|
}`, from, to)
|
|
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
assert.Equal(t, dateHistogramAgg.Field, "@timestamp")
|
|
})
|
|
|
|
t.Run("Should use field from bucket agg when specified", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "count", "id": "1" }],
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "id": "2", "field": "@time", "settings": { "min_doc_count": "1" } }
|
|
]
|
|
}`, from, to)
|
|
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
assert.Equal(t, dateHistogramAgg.Field, "@time")
|
|
})
|
|
|
|
t.Run("Should use fixed_interval", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "count", "id": "1" }],
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "id": "2", "field": "@time", "settings": { "min_doc_count": "1", "interval": "1d" } }
|
|
]
|
|
}`, from, to)
|
|
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
assert.Equal(t, dateHistogramAgg.FixedInterval, "1d")
|
|
})
|
|
|
|
t.Run("Should use calendar_interval", func(t *testing.T) {
|
|
c := newFakeClient()
|
|
_, err := executeElasticsearchDataQuery(c, `{
|
|
"metrics": [{ "type": "count", "id": "1" }],
|
|
"bucketAggs": [
|
|
{ "type": "date_histogram", "id": "2", "field": "@time", "settings": { "min_doc_count": "1", "interval": "1w" } }
|
|
]
|
|
}`, from, to)
|
|
|
|
assert.Nil(t, err)
|
|
sr := c.multisearchRequests[0].Requests[0]
|
|
dateHistogramAgg := sr.Aggs[0].Aggregation.Aggregation.(*es.DateHistogramAgg)
|
|
assert.Equal(t, dateHistogramAgg.CalendarInterval, "1w")
|
|
})
|
|
})
|
|
}
|
|
|
|
type fakeClient struct {
|
|
configuredFields es.ConfiguredFields
|
|
multiSearchResponse *es.MultiSearchResponse
|
|
multiSearchError error
|
|
builder *es.MultiSearchRequestBuilder
|
|
multisearchRequests []*es.MultiSearchRequest
|
|
}
|
|
|
|
func newFakeClient() *fakeClient {
|
|
configuredFields := es.ConfiguredFields{
|
|
TimeField: "@timestamp",
|
|
LogMessageField: "line",
|
|
LogLevelField: "lvl",
|
|
}
|
|
|
|
return &fakeClient{
|
|
configuredFields: configuredFields,
|
|
multisearchRequests: make([]*es.MultiSearchRequest, 0),
|
|
multiSearchResponse: &es.MultiSearchResponse{},
|
|
}
|
|
}
|
|
|
|
func (c *fakeClient) GetConfiguredFields() es.ConfiguredFields {
|
|
return c.configuredFields
|
|
}
|
|
|
|
func (c *fakeClient) ExecuteMultisearch(r *es.MultiSearchRequest) (*es.MultiSearchResponse, error) {
|
|
c.multisearchRequests = append(c.multisearchRequests, r)
|
|
return c.multiSearchResponse, c.multiSearchError
|
|
}
|
|
|
|
func (c *fakeClient) MultiSearch() *es.MultiSearchRequestBuilder {
|
|
c.builder = es.NewMultiSearchRequestBuilder()
|
|
return c.builder
|
|
}
|
|
|
|
func newDataQuery(body string) (backend.QueryDataRequest, error) {
|
|
return backend.QueryDataRequest{
|
|
Queries: []backend.DataQuery{
|
|
{
|
|
JSON: json.RawMessage(body),
|
|
Interval: 10 * time.Second,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) (
|
|
*backend.QueryDataResponse, error) {
|
|
timeRange := backend.TimeRange{
|
|
From: from,
|
|
To: to,
|
|
}
|
|
dataRequest := backend.QueryDataRequest{
|
|
Queries: []backend.DataQuery{
|
|
{
|
|
JSON: json.RawMessage(body),
|
|
TimeRange: timeRange,
|
|
RefID: "A",
|
|
},
|
|
},
|
|
}
|
|
query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger"), tracing.InitializeTracerForTest())
|
|
return query.execute()
|
|
}
|