mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Elasticsearch: Fix legend for alerting, expressions and previously frontend queries (#84485)
* Elasticsearch: Fix legend for alerting, expressions and previously frontend queries * Add comment * Update comment
This commit is contained in:
parent
296f4219f8
commit
494d169980
@ -23,20 +23,27 @@ const (
|
||||
)
|
||||
|
||||
type elasticsearchDataQuery struct {
|
||||
client es.Client
|
||||
dataQueries []backend.DataQuery
|
||||
logger log.Logger
|
||||
ctx context.Context
|
||||
tracer tracing.Tracer
|
||||
client es.Client
|
||||
dataQueries []backend.DataQuery
|
||||
logger log.Logger
|
||||
ctx context.Context
|
||||
tracer tracing.Tracer
|
||||
keepLabelsInResponse bool
|
||||
}
|
||||
|
||||
var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, dataQuery []backend.DataQuery, logger log.Logger, tracer tracing.Tracer) *elasticsearchDataQuery {
|
||||
var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, req *backend.QueryDataRequest, logger log.Logger, tracer tracing.Tracer) *elasticsearchDataQuery {
|
||||
_, fromAlert := req.Headers[headerFromAlert]
|
||||
fromExpression := req.GetHTTPHeader(headerFromExpression) != ""
|
||||
|
||||
return &elasticsearchDataQuery{
|
||||
client: client,
|
||||
dataQueries: dataQuery,
|
||||
dataQueries: req.Queries,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
tracer: tracer,
|
||||
// To maintain backward compatibility, it is necessary to keep labels in responses for alerting and expressions queries.
|
||||
// Historically, these labels have been used in alerting rules and transformations.
|
||||
keepLabelsInResponse: fromAlert || fromExpression,
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,7 +84,7 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
||||
return errorsource.AddErrorToResponse(e.dataQueries[0].RefID, response, err), nil
|
||||
}
|
||||
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.logger, e.tracer)
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger, e.tracer)
|
||||
}
|
||||
|
||||
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
||||
|
@ -1862,6 +1862,6 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time)
|
||||
},
|
||||
},
|
||||
}
|
||||
query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger"), tracing.InitializeTracerForTest())
|
||||
query := newElasticsearchDataQuery(context.Background(), c, &dataRequest, log.New("test.logger"), tracing.InitializeTracerForTest())
|
||||
return query.execute()
|
||||
}
|
||||
|
@ -24,12 +24,18 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
var eslog = log.New("tsdb.elasticsearch")
|
||||
|
||||
const (
|
||||
// headerFromExpression is used by data sources to identify expression queries
|
||||
headerFromExpression = "X-Grafana-From-Expr"
|
||||
// headerFromAlert is used by datasources to identify alert queries
|
||||
headerFromAlert = "FromAlert"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
httpClientProvider httpclient.Provider
|
||||
im instancemgmt.InstanceManager
|
||||
@ -48,7 +54,7 @@ func ProvideService(httpClientProvider httpclient.Provider, tracer tracing.Trace
|
||||
|
||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||
_, fromAlert := req.Headers[headerFromAlert]
|
||||
logger := s.logger.FromContext(ctx).New("fromAlert", fromAlert)
|
||||
|
||||
if err != nil {
|
||||
@ -56,12 +62,12 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
|
||||
return queryData(ctx, req.Queries, dsInfo, logger, s.tracer)
|
||||
return queryData(ctx, req, dsInfo, logger, s.tracer)
|
||||
}
|
||||
|
||||
// separate function to allow testing the whole transformation and query flow
|
||||
func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
if len(queries) == 0 {
|
||||
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *es.DatasourceInfo, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
if len(req.Queries) == 0 {
|
||||
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
|
||||
}
|
||||
|
||||
@ -69,7 +75,7 @@ func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.Data
|
||||
if err != nil {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
query := newElasticsearchDataQuery(ctx, client, queries, logger, tracer)
|
||||
query := newElasticsearchDataQuery(ctx, client, req, logger, tracer)
|
||||
return query.execute()
|
||||
}
|
||||
|
||||
|
@ -114,6 +114,9 @@ type queryDataTestResult struct {
|
||||
|
||||
func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int, responseBytes []byte) (queryDataTestResult, error) {
|
||||
queries, err := newFlowTestQueries(queriesBytes)
|
||||
req := backend.QueryDataRequest{
|
||||
Queries: queries,
|
||||
}
|
||||
if err != nil {
|
||||
return queryDataTestResult{}, err
|
||||
}
|
||||
@ -138,7 +141,7 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int,
|
||||
return nil
|
||||
})
|
||||
|
||||
result, err := queryData(context.Background(), queries, dsInfo, log.New("test.logger"), tracing.InitializeTracerForTest())
|
||||
result, err := queryData(context.Background(), &req, dsInfo, log.New("test.logger"), tracing.InitializeTracerForTest())
|
||||
if err != nil {
|
||||
return queryDataTestResult{}, err
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ const (
|
||||
|
||||
var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString))
|
||||
|
||||
func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, keepLabelsInResponse bool, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
result := backend.QueryDataResponse{
|
||||
Responses: backend.Responses{},
|
||||
}
|
||||
@ -117,7 +117,7 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets
|
||||
resSpan.End()
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
nameFields(queryRes, target)
|
||||
nameFields(queryRes, target, keepLabelsInResponse)
|
||||
trimDatapoints(queryRes, target)
|
||||
|
||||
result.Responses[target.RefID] = queryRes
|
||||
@ -888,7 +888,7 @@ func getSortedLabelValues(labels data.Labels) []string {
|
||||
return values
|
||||
}
|
||||
|
||||
func nameFields(queryResult backend.DataResponse, target *Query) {
|
||||
func nameFields(queryResult backend.DataResponse, target *Query, keepLabelsInResponse bool) {
|
||||
set := make(map[string]struct{})
|
||||
frames := queryResult.Frames
|
||||
for _, v := range frames {
|
||||
@ -907,10 +907,18 @@ func nameFields(queryResult backend.DataResponse, target *Query) {
|
||||
// another is "number"
|
||||
valueField := frame.Fields[1]
|
||||
fieldName := getFieldName(*valueField, target, metricTypeCount)
|
||||
// We need to remove labels so they are not added to legend as duplicates
|
||||
// ensures backward compatibility with "frontend" version of the plugin
|
||||
valueField.Labels = nil
|
||||
frame.Name = fieldName
|
||||
// If we need to keep the labels in the response, to prevent duplication in names and to keep
|
||||
// backward compatibility with alerting and expressions we use DisplayNameFromDS
|
||||
if keepLabelsInResponse {
|
||||
if valueField.Config == nil {
|
||||
valueField.Config = &data.FieldConfig{}
|
||||
}
|
||||
valueField.Config.DisplayNameFromDS = fieldName
|
||||
// If we don't need to keep labels (how frontend mode worked), we use frame.Name and remove labels
|
||||
} else {
|
||||
valueField.Labels = nil
|
||||
frame.Name = fieldName
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -330,7 +330,7 @@ func TestProcessLogsResponse(t *testing.T) {
|
||||
]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -417,7 +417,7 @@ func TestProcessLogsResponse(t *testing.T) {
|
||||
]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -525,7 +525,7 @@ func TestProcessRawDataResponse(t *testing.T) {
|
||||
]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -814,7 +814,7 @@ func TestProcessRawDocumentResponse(t *testing.T) {
|
||||
]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -995,7 +995,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -1097,7 +1097,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -1160,7 +1160,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -1233,7 +1233,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, result.Responses, 1)
|
||||
frames := result.Responses["A"].Frames
|
||||
@ -1464,7 +1464,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
}]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, result.Responses, 1)
|
||||
|
||||
@ -1551,7 +1551,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}]
|
||||
}`
|
||||
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, result.Responses, 1)
|
||||
frames := result.Responses["A"].Frames
|
||||
@ -1749,7 +1749,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
queryRes := result.Responses["A"]
|
||||
@ -1775,6 +1775,70 @@ func TestProcessBuckets(t *testing.T) {
|
||||
assert.Equal(t, frame.Name, "server2")
|
||||
})
|
||||
|
||||
t.Run("Single group by query one metric with true keepLabelsInResponse", func(t *testing.T) {
|
||||
targets := map[string]string{
|
||||
"A": `{
|
||||
"metrics": [{ "type": "count", "id": "1" }],
|
||||
"bucketAggs": [
|
||||
{ "type": "terms", "field": "host", "id": "2" },
|
||||
{ "type": "date_histogram", "field": "@timestamp", "id": "3" }
|
||||
]
|
||||
}`,
|
||||
}
|
||||
response := `{
|
||||
"responses": [
|
||||
{
|
||||
"aggregations": {
|
||||
"2": {
|
||||
"buckets": [
|
||||
{
|
||||
"3": {
|
||||
"buckets": [{ "doc_count": 1, "key": 1000 }, { "doc_count": 3, "key": 2000 }]
|
||||
},
|
||||
"doc_count": 4,
|
||||
"key": "server1"
|
||||
},
|
||||
{
|
||||
"3": {
|
||||
"buckets": [{ "doc_count": 2, "key": 1000 }, { "doc_count": 8, "key": 2000 }]
|
||||
},
|
||||
"doc_count": 10,
|
||||
"key": "server2"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
queryRes := result.Responses["A"]
|
||||
require.NotNil(t, queryRes)
|
||||
dataframes := queryRes.Frames
|
||||
require.NoError(t, err)
|
||||
require.Len(t, dataframes, 2)
|
||||
|
||||
frame := dataframes[0]
|
||||
require.Len(t, frame.Fields, 2)
|
||||
require.Equal(t, frame.Fields[0].Name, data.TimeSeriesTimeFieldName)
|
||||
require.Equal(t, frame.Fields[0].Len(), 2)
|
||||
require.Equal(t, frame.Fields[1].Name, data.TimeSeriesValueFieldName)
|
||||
require.Equal(t, frame.Fields[1].Len(), 2)
|
||||
require.Equal(t, frame.Fields[1].Labels, data.Labels{"host": "server1"})
|
||||
assert.Equal(t, frame.Fields[1].Config.DisplayNameFromDS, "server1")
|
||||
|
||||
frame = dataframes[1]
|
||||
require.Len(t, frame.Fields, 2)
|
||||
require.Equal(t, frame.Fields[0].Name, data.TimeSeriesTimeFieldName)
|
||||
require.Equal(t, frame.Fields[0].Len(), 2)
|
||||
require.Equal(t, frame.Fields[1].Name, data.TimeSeriesValueFieldName)
|
||||
require.Equal(t, frame.Fields[1].Len(), 2)
|
||||
require.Equal(t, frame.Fields[1].Labels, data.Labels{"host": "server2"})
|
||||
assert.Equal(t, frame.Fields[1].Config.DisplayNameFromDS, "server2")
|
||||
})
|
||||
|
||||
t.Run("Single group by query two metrics", func(t *testing.T) {
|
||||
targets := map[string]string{
|
||||
"A": `{
|
||||
@ -1817,7 +1881,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -1969,7 +2033,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2142,7 +2206,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2274,7 +2338,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2322,7 +2386,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2384,7 +2448,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2506,7 +2570,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2609,7 +2673,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2659,7 +2723,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2721,7 +2785,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2789,7 +2853,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -2853,7 +2917,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
@ -2907,7 +2971,7 @@ func TestProcessBuckets(t *testing.T) {
|
||||
}
|
||||
]
|
||||
}`
|
||||
result, err := parseTestResponse(targets, response)
|
||||
result, err := parseTestResponse(targets, response, false)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, result.Responses, 1)
|
||||
|
||||
@ -3583,7 +3647,7 @@ func TestTrimEdges(t *testing.T) {
|
||||
requireFrameLength(t, frames[0], 1)
|
||||
}
|
||||
|
||||
func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*backend.QueryDataResponse, error) {
|
||||
func parseTestResponse(tsdbQueries map[string]string, responseBody string, keepLabelsInResponse bool) (*backend.QueryDataResponse, error) {
|
||||
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
|
||||
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
|
||||
configuredFields := es.ConfiguredFields{
|
||||
@ -3618,7 +3682,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return parseResponse(context.Background(), response.Responses, queries, configuredFields, log.New("test.logger"), tracing.InitializeTracerForTest())
|
||||
return parseResponse(context.Background(), response.Responses, queries, configuredFields, keepLabelsInResponse, log.New("test.logger"), tracing.InitializeTracerForTest())
|
||||
}
|
||||
|
||||
func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) {
|
||||
|
Loading…
Reference in New Issue
Block a user