From eff2410baea156a791971e145916655068b940f3 Mon Sep 17 00:00:00 2001 From: Erik Sundell Date: Thu, 10 Jun 2021 10:23:17 +0200 Subject: [PATCH] Cloudwatch: Fix duplicated time series (#35433) * make sure queries are only ran once * test aliases * use correct dates --- pkg/tsdb/cloudwatch/request_parser.go | 4 +- pkg/tsdb/cloudwatch/test_utils.go | 5 + pkg/tsdb/cloudwatch/time_series_query.go | 175 ++++++++---------- pkg/tsdb/cloudwatch/time_series_query_test.go | 106 +++++++++++ 4 files changed, 193 insertions(+), 97 deletions(-) diff --git a/pkg/tsdb/cloudwatch/request_parser.go b/pkg/tsdb/cloudwatch/request_parser.go index 29a78522a29..8a9f10a368e 100644 --- a/pkg/tsdb/cloudwatch/request_parser.go +++ b/pkg/tsdb/cloudwatch/request_parser.go @@ -15,9 +15,9 @@ import ( ) // Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row -func (e *cloudWatchExecutor) parseQueries(req *backend.QueryDataRequest, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) { +func (e *cloudWatchExecutor) parseQueries(queries []backend.DataQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) { requestQueries := make(map[string][]*requestQuery) - for _, query := range req.Queries { + for _, query := range queries { model, err := simplejson.NewJson(query.JSON) if err != nil { return nil, &queryError{err: err, RefID: query.RefID} diff --git a/pkg/tsdb/cloudwatch/test_utils.go b/pkg/tsdb/cloudwatch/test_utils.go index 72206fdf5f5..9633606cbe8 100644 --- a/pkg/tsdb/cloudwatch/test_utils.go +++ b/pkg/tsdb/cloudwatch/test_utils.go @@ -51,12 +51,17 @@ func (m FakeCWLogsClient) GetLogGroupFieldsWithContext(ctx context.Context, inpu type FakeCWClient struct { cloudwatchiface.CloudWatchAPI + cloudwatch.GetMetricDataOutput Metrics []*cloudwatch.Metric MetricsPerPage int } +func (c FakeCWClient) GetMetricDataWithContext(aws.Context, *cloudwatch.GetMetricDataInput, ...request.Option) (*cloudwatch.GetMetricDataOutput, error) { + return &c.GetMetricDataOutput, nil +} + func (c FakeCWClient) ListMetricsPages(input *cloudwatch.ListMetricsInput, fn func(*cloudwatch.ListMetricsOutput, bool) bool) error { if c.MetricsPerPage == 0 { c.MetricsPerPage = 1000 diff --git a/pkg/tsdb/cloudwatch/time_series_query.go b/pkg/tsdb/cloudwatch/time_series_query.go index e2eed22706c..c0a379c27db 100644 --- a/pkg/tsdb/cloudwatch/time_series_query.go +++ b/pkg/tsdb/cloudwatch/time_series_query.go @@ -5,120 +5,105 @@ import ( "fmt" "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/infra/log" "golang.org/x/sync/errgroup" ) +type responseWrapper struct { + DataResponse *backend.DataResponse + RefId string +} + func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { plog.Debug("Executing time series query") - resp := backend.NewQueryDataResponse() - for _, q := range req.Queries { - startTime := q.TimeRange.From - endTime := q.TimeRange.To - if !startTime.Before(endTime) { - return nil, fmt.Errorf("invalid time range: start time must be before end time") - } + if len(req.Queries) == 0 { + return nil, fmt.Errorf("request contains no queries") + } - requestQueriesByRegion, err := e.parseQueries(req, startTime, endTime) - if err != nil { - return nil, err - } + // startTime and endTime are always the same for all queries + startTime := req.Queries[0].TimeRange.From + endTime := req.Queries[0].TimeRange.To + if !startTime.Before(endTime) { + return nil, fmt.Errorf("invalid time range: start time must be before end time") + } - if len(requestQueriesByRegion) == 0 { - return backend.NewQueryDataResponse(), nil - } + requestQueriesByRegion, err := e.parseQueries(req.Queries, startTime, endTime) + if err != nil { + return nil, err + } - resultChan := make(chan *backend.DataResponse, len(req.Queries)) - eg, ectx := errgroup.WithContext(ctx) - for r, q := range requestQueriesByRegion { - requestQueries := q - region := r - eg.Go(func() error { - defer func() { - if err := recover(); err != nil { - plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1)) - if theErr, ok := err.(error); ok { - resultChan <- &backend.DataResponse{ + if len(requestQueriesByRegion) == 0 { + return backend.NewQueryDataResponse(), nil + } + + resultChan := make(chan *responseWrapper, len(req.Queries)) + eg, ectx := errgroup.WithContext(ctx) + for r, q := range requestQueriesByRegion { + requestQueries := q + region := r + eg.Go(func() error { + defer func() { + if err := recover(); err != nil { + plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1)) + if theErr, ok := err.(error); ok { + resultChan <- &responseWrapper{ + DataResponse: &backend.DataResponse{ Error: theErr, - } + }, } } - }() - - client, err := e.getCWClient(region, req.PluginContext) - if err != nil { - return err } + }() - queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries) - if err != nil { - for _, query := range requestQueries { - resultChan <- &backend.DataResponse{ - Frames: data.Frames{data.NewFrame(query.RefId)}, - Error: err, - } - } - return nil + client, err := e.getCWClient(region, req.PluginContext) + if err != nil { + return err + } + + queries, err := e.transformRequestQueriesToCloudWatchQueries(requestQueries) + if err != nil { + return err + } + + metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries) + if err != nil { + return err + } + + mdo, err := e.executeRequest(ectx, client, metricDataInput) + if err != nil { + return err + } + + responses, err := e.parseResponse(mdo, queries) + if err != nil { + return err + } + + res, err := e.transformQueryResponsesToQueryResult(responses, requestQueries, startTime, endTime) + if err != nil { + return err + } + + for refID, queryRes := range res { + resultChan <- &responseWrapper{ + DataResponse: queryRes, + RefId: refID, } + } + return nil + }) + } - metricDataInput, err := e.buildMetricDataInput(startTime, endTime, queries) - if err != nil { - return err - } + if err := eg.Wait(); err != nil { + return nil, err + } + close(resultChan) - cloudwatchResponses := make([]*cloudwatchResponse, 0) - mdo, err := e.executeRequest(ectx, client, metricDataInput) - if err != nil { - for _, query := range requestQueries { - resultChan <- &backend.DataResponse{ - Frames: data.Frames{data.NewFrame(query.RefId)}, - Error: err, - } - } - return nil - } - - responses, err := e.parseResponse(mdo, queries) - if err != nil { - for _, query := range requestQueries { - resultChan <- &backend.DataResponse{ - Frames: data.Frames{data.NewFrame(query.RefId)}, - Error: err, - } - } - return nil - } - - cloudwatchResponses = append(cloudwatchResponses, responses...) - res, err := e.transformQueryResponsesToQueryResult(cloudwatchResponses, requestQueries, startTime, endTime) - if err != nil { - for _, query := range requestQueries { - resultChan <- &backend.DataResponse{ - Frames: data.Frames{data.NewFrame(query.RefId)}, - Error: err, - } - } - return nil - } - - for _, queryRes := range res { - resultChan <- queryRes - } - return nil - }) - } - - if err := eg.Wait(); err != nil { - return nil, err - } - close(resultChan) - - for result := range resultChan { - resp.Responses[q.RefID] = *result - } + for result := range resultChan { + resp.Responses[result.RefId] = *result.DataResponse } return resp, nil diff --git a/pkg/tsdb/cloudwatch/time_series_query_test.go b/pkg/tsdb/cloudwatch/time_series_query_test.go index a4d7349ed9e..006aca5db72 100644 --- a/pkg/tsdb/cloudwatch/time_series_query_test.go +++ b/pkg/tsdb/cloudwatch/time_series_query_test.go @@ -2,18 +2,124 @@ package cloudwatch import ( "context" + "encoding/json" "testing" "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatch" + "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestTimeSeriesQuery(t *testing.T) { executor := newExecutor(nil, nil, newTestConfig(), fakeSessionCache{}) now := time.Now() + origNewCWClient := NewCWClient + t.Cleanup(func() { + NewCWClient = origNewCWClient + }) + + var cwClient FakeCWClient + + NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI { + return cwClient + } + + t.Run("Custom metrics", func(t *testing.T) { + cwClient = FakeCWClient{ + CloudWatchAPI: nil, + GetMetricDataOutput: cloudwatch.GetMetricDataOutput{ + NextToken: nil, + Messages: []*cloudwatch.MessageData{}, + MetricDataResults: []*cloudwatch.MetricDataResult{ + { + StatusCode: aws.String("Complete"), Id: aws.String("a"), Label: aws.String("NetworkOut"), Values: []*float64{aws.Float64(1.0)}, Timestamps: []*time.Time{&now}, + }, + { + StatusCode: aws.String("Complete"), Id: aws.String("b"), Label: aws.String("NetworkIn"), Values: []*float64{aws.Float64(1.0)}, Timestamps: []*time.Time{&now}, + }, + }, + }, + } + + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil + }) + + executor := newExecutor(nil, im, newTestConfig(), fakeSessionCache{}) + resp, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ + { + RefID: "A", + TimeRange: backend.TimeRange{ + From: now.Add(time.Hour * -2), + To: now.Add(time.Hour * -1), + }, + JSON: json.RawMessage(`{ + "type": "timeSeriesQuery", + "subtype": "metrics", + "namespace": "AWS/EC2", + "metricName": "NetworkOut", + "expression": "", + "dimensions": { + "InstanceId": "i-00645d91ed77d87ac" + }, + "region": "us-east-2", + "id": "a", + "alias": "NetworkOut", + "statistics": [ + "Maximum" + ], + "period": "300", + "hide": false, + "matchExact": true, + "refId": "A" + }`), + }, + { + RefID: "B", + TimeRange: backend.TimeRange{ + From: now.Add(time.Hour * -2), + To: now.Add(time.Hour * -1), + }, + JSON: json.RawMessage(`{ + "type": "timeSeriesQuery", + "subtype": "metrics", + "namespace": "AWS/EC2", + "metricName": "NetworkIn", + "expression": "", + "dimensions": { + "InstanceId": "i-00645d91ed77d87ac" + }, + "region": "us-east-2", + "id": "b", + "alias": "NetworkIn", + "statistics": [ + "Maximum" + ], + "period": "300", + "matchExact": true, + "refId": "B" + }`), + }, + }, + }) + require.NoError(t, err) + assert.Equal(t, "NetworkOut", resp.Responses["A"].Frames[0].Name) + assert.Equal(t, "NetworkIn", resp.Responses["B"].Frames[0].Name) + }) + t.Run("End time before start time should result in error", func(t *testing.T) { _, err := executor.executeTimeSeriesQuery(context.TODO(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ From: now.Add(time.Hour * -1),