From 51391a762b77c0055f0d592cde93e56abd68f4a4 Mon Sep 17 00:00:00 2001 From: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Date: Thu, 7 Sep 2023 13:54:16 +0200 Subject: [PATCH] Elasticsearch: Improve backend instrumentation of `QueryData` calls (#74172) * Elasticsearch: Improve backend instrumentation of QueryData calls * Add fromAlert * Fix tests, move logger and context to struct * Add instrumentation for processing response * Move log for sending request closer to sending request * Update * Fix logging, improved messages, fix printing of queries * Update log text * Fix tests * Fix lint * Update logging to follow our guidelines * Remove key-value pairs from logs that are going to be passed from instrumentation * Update pkg/tsdb/elasticsearch/elasticsearch.go --- pkg/tsdb/elasticsearch/client/client.go | 44 +++++++--------- pkg/tsdb/elasticsearch/client/client_test.go | 5 +- .../elasticsearch/client/index_pattern.go | 1 + pkg/tsdb/elasticsearch/data_query.go | 23 +++++++-- pkg/tsdb/elasticsearch/data_query_test.go | 4 +- pkg/tsdb/elasticsearch/elasticsearch.go | 17 ++++--- pkg/tsdb/elasticsearch/parse_query.go | 5 +- pkg/tsdb/elasticsearch/parse_query_test.go | 3 +- pkg/tsdb/elasticsearch/querydata_test.go | 3 +- pkg/tsdb/elasticsearch/response_parser.go | 50 +++++++++++++------ .../elasticsearch/response_parser_test.go | 6 ++- 11 files changed, 102 insertions(+), 59 deletions(-) diff --git a/pkg/tsdb/elasticsearch/client/client.go b/pkg/tsdb/elasticsearch/client/client.go index 71e9dc328b1..70ed3946add 100644 --- a/pkg/tsdb/elasticsearch/client/client.go +++ b/pkg/tsdb/elasticsearch/client/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "net/http" "net/url" @@ -36,8 +37,6 @@ type ConfiguredFields struct { LogLevelField string } -const loggerName = "tsdb.elasticsearch.client" - // Client represents a client which can interact with elasticsearch api type Client interface { GetConfiguredFields() ConfiguredFields @@ -46,9 +45,12 @@ type Client interface { } // NewClient creates a new elasticsearch client -var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange) (Client, error) { +var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend.TimeRange, logger log.Logger) (Client, error) { + logger = logger.New("entity", "client") + ip, err := newIndexPattern(ds.Interval, ds.Database) if err != nil { + logger.Error("Failed creating index pattern", "error", err, "interval", ds.Interval, "index", ds.Database) return nil, err } @@ -56,9 +58,7 @@ var NewClient = func(ctx context.Context, ds *DatasourceInfo, timeRange backend. if err != nil { return nil, err } - - logger := log.New(loggerName).FromContext(ctx) - logger.Debug("Creating new client", "configuredFields", fmt.Sprintf("%#v", ds.ConfiguredFields), "indices", strings.Join(indices, ", ")) + logger.Debug("Creating new client", "configuredFields", fmt.Sprintf("%#v", ds.ConfiguredFields), "indices", strings.Join(indices, ", "), "interval", ds.Interval, "index", ds.Database) return &baseClientImpl{ logger: logger, @@ -98,7 +98,6 @@ func (c *baseClientImpl) executeBatchRequest(uriPath, uriQuery string, requests } func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, error) { - c.logger.Debug("Encoding batch requests to json", "batch requests", len(requests)) start := time.Now() payload := bytes.Buffer{} @@ -122,12 +121,13 @@ func (c *baseClientImpl) encodeBatchRequests(requests []*multiRequest) ([]byte, } elapsed := time.Since(start) - c.logger.Debug("Encoded batch requests to json", "took", elapsed) + c.logger.Debug("Completed encoding of batch requests to json", "duration", elapsed) return payload.Bytes(), nil } func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body []byte) (*http.Response, error) { + c.logger.Debug("Sending request to Elasticsearch", "url", c.ds.URL) u, err := url.Parse(c.ds.URL) if err != nil { return nil, err @@ -145,54 +145,48 @@ func (c *baseClientImpl) executeRequest(method, uriPath, uriQuery string, body [ return nil, err } - c.logger.Debug("Executing request", "url", req.URL.String(), "method", method) - req.Header.Set("Content-Type", "application/x-ndjson") - start := time.Now() - defer func() { - elapsed := time.Since(start) - c.logger.Debug("Executed request", "took", elapsed) - }() //nolint:bodyclose resp, err := c.ds.HTTPClient.Do(req) if err != nil { return nil, err } - return resp, nil } func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearchResponse, error) { - c.logger.Debug("Executing multisearch", "search requests", len(r.Requests)) - multiRequests := c.createMultiSearchRequests(r.Requests) queryParams := c.getMultiSearchQueryParameters() + start := time.Now() clientRes, err := c.executeBatchRequest("_msearch", queryParams, multiRequests) if err != nil { + status := "error" + if errors.Is(err, context.Canceled) { + status = "cancelled" + } + c.logger.Error("Error received from Elasticsearch", "error", err, "status", status, "statusCode", clientRes.StatusCode, "duration", time.Since(start), "action", "databaseRequest") return nil, err } res := clientRes defer func() { if err := res.Body.Close(); err != nil { - c.logger.Warn("Failed to close response body", "err", err) + c.logger.Warn("Failed to close response body", "error", err) } }() - c.logger.Debug("Received multisearch response", "code", res.StatusCode, "status", res.Status, "content-length", res.ContentLength) - - start := time.Now() - c.logger.Debug("Decoding multisearch json response") + c.logger.Info("Response received from Elasticsearch", "status", "ok", "statusCode", res.StatusCode, "contentLength", res.ContentLength, "duration", time.Since(start), "action", "databaseRequest") + start = time.Now() var msr MultiSearchResponse dec := json.NewDecoder(res.Body) err = dec.Decode(&msr) if err != nil { + c.logger.Error("Failed to decode response from Elasticsearch", "error", err, "duration", time.Since(start)) return nil, err } - elapsed := time.Since(start) - c.logger.Debug("Decoded multisearch json response", "took", elapsed) + c.logger.Debug("Completed decoding of response from Elasticsearch", "duration", time.Since(start)) msr.Status = res.StatusCode diff --git a/pkg/tsdb/elasticsearch/client/client_test.go b/pkg/tsdb/elasticsearch/client/client_test.go index 530965fb4bf..04f0e6c90c0 100644 --- a/pkg/tsdb/elasticsearch/client/client_test.go +++ b/pkg/tsdb/elasticsearch/client/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/log" ) func TestClient_ExecuteMultisearch(t *testing.T) { @@ -66,7 +67,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) { To: to, } - c, err := NewClient(context.Background(), &ds, timeRange) + c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test")) require.NoError(t, err) require.NotNil(t, c) @@ -188,7 +189,7 @@ func TestClient_Index(t *testing.T) { To: to, } - c, err := NewClient(context.Background(), &ds, timeRange) + c, err := NewClient(context.Background(), &ds, timeRange, log.New("test", "test")) require.NoError(t, err) require.NotNil(t, c) diff --git a/pkg/tsdb/elasticsearch/client/index_pattern.go b/pkg/tsdb/elasticsearch/client/index_pattern.go index ff2e1205068..0f0b2b3cee2 100644 --- a/pkg/tsdb/elasticsearch/client/index_pattern.go +++ b/pkg/tsdb/elasticsearch/client/index_pattern.go @@ -34,6 +34,7 @@ type staticIndexPattern struct { indexName string } +// TODO: This never returns an error, we should refactor and remove it func (ip *staticIndexPattern) GetIndices(timeRange backend.TimeRange) ([]string, error) { if ip.indexName != "" { return []string{ip.indexName}, nil diff --git a/pkg/tsdb/elasticsearch/data_query.go b/pkg/tsdb/elasticsearch/data_query.go index 6f7cc57c234..a9f3d37f814 100644 --- a/pkg/tsdb/elasticsearch/data_query.go +++ b/pkg/tsdb/elasticsearch/data_query.go @@ -1,6 +1,8 @@ package elasticsearch import ( + "context" + "encoding/json" "fmt" "regexp" "strconv" @@ -9,6 +11,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/log" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -19,18 +22,26 @@ const ( type elasticsearchDataQuery struct { client es.Client dataQueries []backend.DataQuery + logger log.Logger + ctx context.Context } -var newElasticsearchDataQuery = func(client es.Client, dataQuery []backend.DataQuery) *elasticsearchDataQuery { +var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, dataQuery []backend.DataQuery, logger log.Logger) *elasticsearchDataQuery { return &elasticsearchDataQuery{ client: client, dataQueries: dataQuery, + logger: logger, + ctx: ctx, } } func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) { - queries, err := parseQuery(e.dataQueries) + start := time.Now() + e.logger.Debug("Parsing queries", "queriesLength", len(e.dataQueries)) + queries, err := parseQuery(e.dataQueries, e.logger) if err != nil { + mq, _ := json.Marshal(e.dataQueries) + e.logger.Error("Failed to parse queries", "error", err, "queries", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "action", "prepareRequest") return &backend.QueryDataResponse{}, err } @@ -40,26 +51,32 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) { to := e.dataQueries[0].TimeRange.To.UnixNano() / int64(time.Millisecond) for _, q := range queries { if err := e.processQuery(q, ms, from, to); err != nil { + mq, _ := json.Marshal(q) + e.logger.Error("Failed to process query to multisearch request builder", "error", err, "query", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "action", "prepareRequest") return &backend.QueryDataResponse{}, err } } req, err := ms.Build() if err != nil { + mqs, _ := json.Marshal(e.dataQueries) + e.logger.Error("Failed to build multisearch request", "error", err, "queriesLength", len(queries), "queries", string(mqs), "duration", time.Since(start), "action", "prepareRequest") return &backend.QueryDataResponse{}, err } + e.logger.Info("Prepared request", "queriesLength", len(queries), "duration", time.Since(start), "action", "prepareRequest") res, err := e.client.ExecuteMultisearch(req) if err != nil { return &backend.QueryDataResponse{}, err } - return parseResponse(res.Responses, queries, e.client.GetConfiguredFields()) + return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.logger) } func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error { err := isQueryWithError(q) if err != nil { + err = fmt.Errorf("received invalid query. %w", err) return err } diff --git a/pkg/tsdb/elasticsearch/data_query_test.go b/pkg/tsdb/elasticsearch/data_query_test.go index 863d0717c13..b51aa65f908 100644 --- a/pkg/tsdb/elasticsearch/data_query_test.go +++ b/pkg/tsdb/elasticsearch/data_query_test.go @@ -1,6 +1,7 @@ package elasticsearch import ( + "context" "encoding/json" "testing" "time" @@ -9,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/infra/log" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -1816,6 +1818,6 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time) }, }, } - query := newElasticsearchDataQuery(c, dataRequest.Queries) + query := newElasticsearchDataQuery(context.Background(), c, dataRequest.Queries, log.New("test.logger")) return query.execute() } diff --git a/pkg/tsdb/elasticsearch/elasticsearch.go b/pkg/tsdb/elasticsearch/elasticsearch.go index 75ce0219e7a..d20928af8b4 100644 --- a/pkg/tsdb/elasticsearch/elasticsearch.go +++ b/pkg/tsdb/elasticsearch/elasticsearch.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" + ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -30,8 +31,6 @@ type Service struct { } func ProvideService(httpClientProvider httpclient.Provider) *Service { - eslog.Debug("Initializing") - return &Service{ im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)), httpClientProvider: httpClientProvider, @@ -40,24 +39,28 @@ func ProvideService(httpClientProvider httpclient.Provider) *Service { func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { dsInfo, err := s.getDSInfo(ctx, req.PluginContext) + _, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName] + logger := eslog.FromContext(ctx).New("fromAlert", fromAlert) + if err != nil { + logger.Error("Failed to get data source info", "error", err) return &backend.QueryDataResponse{}, err } - return queryData(ctx, req.Queries, dsInfo) + return queryData(ctx, req.Queries, dsInfo, logger) } // separate function to allow testing the whole transformation and query flow -func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo) (*backend.QueryDataResponse, error) { +func queryData(ctx context.Context, queries []backend.DataQuery, dsInfo *es.DatasourceInfo, logger log.Logger) (*backend.QueryDataResponse, error) { if len(queries) == 0 { return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries") } - client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange) + client, err := es.NewClient(ctx, dsInfo, queries[0].TimeRange, logger) if err != nil { return &backend.QueryDataResponse{}, err } - query := newElasticsearchDataQuery(client, queries) + query := newElasticsearchDataQuery(ctx, client, queries, logger) return query.execute() } @@ -220,7 +223,7 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq defer func() { if err := response.Body.Close(); err != nil { - logger.Warn("Failed to close response body", "err", err) + logger.Warn("Failed to close response body", "error", err) } }() diff --git a/pkg/tsdb/elasticsearch/parse_query.go b/pkg/tsdb/elasticsearch/parse_query.go index b1199d312b2..e3ebf5aa27c 100644 --- a/pkg/tsdb/elasticsearch/parse_query.go +++ b/pkg/tsdb/elasticsearch/parse_query.go @@ -4,9 +4,10 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/log" ) -func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { +func parseQuery(tsdbQuery []backend.DataQuery, logger log.Logger) ([]*Query, error) { queries := make([]*Query, 0) for _, q := range tsdbQuery { model, err := simplejson.NewJson(q.JSON) @@ -20,10 +21,12 @@ func parseQuery(tsdbQuery []backend.DataQuery) ([]*Query, error) { rawQuery := model.Get("query").MustString() bucketAggs, err := parseBucketAggs(model) if err != nil { + logger.Error("Failed to parse bucket aggs in query", "error", err, "model", string(q.JSON)) return nil, err } metrics, err := parseMetrics(model) if err != nil { + logger.Error("Failed to parse metrics in query", "error", err, "model", string(q.JSON)) return nil, err } alias := model.Get("alias").MustString("") diff --git a/pkg/tsdb/elasticsearch/parse_query_test.go b/pkg/tsdb/elasticsearch/parse_query_test.go index af40f767d1d..c2f7512f3b7 100644 --- a/pkg/tsdb/elasticsearch/parse_query_test.go +++ b/pkg/tsdb/elasticsearch/parse_query_test.go @@ -3,6 +3,7 @@ package elasticsearch import ( "testing" + "github.com/grafana/grafana/pkg/infra/log" "github.com/stretchr/testify/require" ) @@ -60,7 +61,7 @@ func TestParseQuery(t *testing.T) { }` dataQuery, err := newDataQuery(body) require.NoError(t, err) - queries, err := parseQuery(dataQuery.Queries) + queries, err := parseQuery(dataQuery.Queries, log.New("test.logger")) require.NoError(t, err) require.Len(t, queries, 1) diff --git a/pkg/tsdb/elasticsearch/querydata_test.go b/pkg/tsdb/elasticsearch/querydata_test.go index 5e20a7b5ae5..e42f0b3b5ce 100644 --- a/pkg/tsdb/elasticsearch/querydata_test.go +++ b/pkg/tsdb/elasticsearch/querydata_test.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -137,7 +138,7 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int, return nil }) - result, err := queryData(context.Background(), queries, dsInfo) + result, err := queryData(context.Background(), queries, dsInfo, log.New("test.logger")) if err != nil { return queryDataTestResult{}, err } diff --git a/pkg/tsdb/elasticsearch/response_parser.go b/pkg/tsdb/elasticsearch/response_parser.go index 0402816016a..2566aecf331 100644 --- a/pkg/tsdb/elasticsearch/response_parser.go +++ b/pkg/tsdb/elasticsearch/response_parser.go @@ -1,6 +1,7 @@ package elasticsearch import ( + "context" "encoding/json" "errors" "fmt" @@ -14,6 +15,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/infra/log" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -39,7 +41,8 @@ const ( var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString)) -func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields) (*backend.QueryDataResponse, error) { +func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, logger log.Logger) (*backend.QueryDataResponse, error) { + start := time.Now() result := backend.QueryDataResponse{ Responses: backend.Responses{}, } @@ -51,6 +54,9 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF target := targets[i] if res.Error != nil { + mt, _ := json.Marshal(target) + me, _ := json.Marshal(res.Error) + logger.Error("Error response from Elasticsearch", "error", string(me), "query", string(mt)) errResult := getErrorFromElasticResponse(res) result.Responses[target.RefID] = backend.DataResponse{ Error: errors.New(errResult), @@ -61,20 +67,23 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF queryRes := backend.DataResponse{} if isRawDataQuery(target) { - err := processRawDataResponse(res, target, configuredFields, &queryRes) + err := processRawDataResponse(res, target, configuredFields, &queryRes, logger) if err != nil { + // TODO: This error never happens so we should remove it return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes } else if isRawDocumentQuery(target) { - err := processRawDocumentResponse(res, target, &queryRes) + err := processRawDocumentResponse(res, target, &queryRes, logger) if err != nil { + // TODO: This error never happens so we should remove it return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes } else if isLogsQuery(target) { - err := processLogsResponse(res, target, configuredFields, &queryRes) + err := processLogsResponse(res, target, configuredFields, &queryRes, logger) if err != nil { + // TODO: This error never happens so we should remove it return &backend.QueryDataResponse{}, err } result.Responses[target.RefID] = queryRes @@ -82,7 +91,10 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF // Process as metric query result props := make(map[string]string) err := processBuckets(res.Aggregations, target, &queryRes, props, 0) + logger.Debug("Processed metric query response") if err != nil { + mt, _ := json.Marshal(target) + logger.Error("Error processing buckets", "error", err, "query", string(mt), "aggregationsLength", len(res.Aggregations)) return &backend.QueryDataResponse{}, err } nameFields(queryRes, target) @@ -91,10 +103,12 @@ func parseResponse(responses []*es.SearchResponse, targets []*Query, configuredF result.Responses[target.RefID] = queryRes } } + + logger.Info("Finished processing responses", "duration", time.Since(start), "responsesLength", len(result.Responses), "queriesLength", len(targets), "action", "parseResponse") return &result, nil } -func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error { +func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse, logger log.Logger) error { propNames := make(map[string]bool) docs := make([]map[string]interface{}, len(res.Hits.Hits)) searchWords := make(map[string]bool) @@ -168,12 +182,13 @@ func processLogsResponse(res *es.SearchResponse, target *Query, configuredFields setPreferredVisType(frame, data.VisTypeLogs) setLogsCustomMeta(frame, searchWords, stringToIntWithDefaultValue(target.Metrics[0].Settings.Get("limit").MustString(), defaultSize)) frames = append(frames, frame) - queryRes.Frames = frames + + logger.Debug("Processed log query response", "fieldsLength", len(frame.Fields)) return nil } -func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse) error { +func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFields es.ConfiguredFields, queryRes *backend.DataResponse, logger log.Logger) error { propNames := make(map[string]bool) docs := make([]map[string]interface{}, len(res.Hits.Hits)) @@ -207,13 +222,15 @@ func processRawDataResponse(res *es.SearchResponse, target *Query, configuredFie frames := data.Frames{} frame := data.NewFrame("", fields...) - frames = append(frames, frame) + frames = append(frames, frame) queryRes.Frames = frames + + logger.Debug("Processed raw data query response", "fieldsLength", len(frame.Fields)) return nil } -func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes *backend.DataResponse) error { +func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes *backend.DataResponse, logger log.Logger) error { docs := make([]map[string]interface{}, len(res.Hits.Hits)) for hitIdx, hit := range res.Hits.Hits { doc := map[string]interface{}{ @@ -266,6 +283,7 @@ func processRawDocumentResponse(res *es.SearchResponse, target *Query, queryRes frames = append(frames, frame) queryRes.Frames = frames + logger.Debug("Processed raw document query response", "fieldsLength", len(frame.Fields)) return nil } @@ -650,32 +668,32 @@ func processMetrics(esAgg *simplejson.Json, target *Query, query *backend.DataRe case countType: countFrames, err := processCountMetric(jsonBuckets, props) if err != nil { - return err + return fmt.Errorf("error processing count metric: %w", err) } frames = append(frames, countFrames...) case percentilesType: percentileFrames, err := processPercentilesMetric(metric, jsonBuckets, props) if err != nil { - return err + return fmt.Errorf("error processing percentiles metric: %w", err) } frames = append(frames, percentileFrames...) case topMetricsType: topMetricsFrames, err := processTopMetricsMetric(metric, jsonBuckets, props) if err != nil { - return err + return fmt.Errorf("error processing top metrics metric: %w", err) } frames = append(frames, topMetricsFrames...) case extendedStatsType: extendedStatsFrames, err := processExtendedStatsMetric(metric, jsonBuckets, props) if err != nil { - return err + return fmt.Errorf("error processing extended stats metric: %w", err) } frames = append(frames, extendedStatsFrames...) default: defaultFrames, err := processDefaultMetric(metric, jsonBuckets, props) if err != nil { - return err + return fmt.Errorf("error processing default metric: %w", err) } frames = append(frames, defaultFrames...) } @@ -713,7 +731,7 @@ func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Q } else { f, err := bucket.Get("key").Float64() if err != nil { - return err + return fmt.Errorf("error appending bucket key to existing field with name %s: %w", field.Name, err) } field.Append(&f) } @@ -728,7 +746,7 @@ func processAggregationDocs(esAgg *simplejson.Json, aggDef *BucketAgg, target *Q } else { f, err := bucket.Get("key").Float64() if err != nil { - return err + return fmt.Errorf("error appending bucket key to new field with name %s: %w", aggDef.Field, err) } aggDefField = extractDataField(aggDef.Field, &f) aggDefField.Append(&f) diff --git a/pkg/tsdb/elasticsearch/response_parser_test.go b/pkg/tsdb/elasticsearch/response_parser_test.go index 8dbe9529005..f65a11b946c 100644 --- a/pkg/tsdb/elasticsearch/response_parser_test.go +++ b/pkg/tsdb/elasticsearch/response_parser_test.go @@ -1,6 +1,7 @@ package elasticsearch import ( + "context" "encoding/json" "flag" "fmt" @@ -13,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/infra/log" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" ) @@ -3526,12 +3528,12 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac return nil, err } - queries, err := parseQuery(tsdbQuery.Queries) + queries, err := parseQuery(tsdbQuery.Queries, log.New("test.logger")) if err != nil { return nil, err } - return parseResponse(response.Responses, queries, configuredFields) + return parseResponse(context.Background(), response.Responses, queries, configuredFields, log.New("test.logger")) } func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) {