diff --git a/pkg/tsdb/cloudwatch/models/cloudwatch_query.go b/pkg/tsdb/cloudwatch/models/cloudwatch_query.go index d073fdb76cd..a44b00c17af 100644 --- a/pkg/tsdb/cloudwatch/models/cloudwatch_query.go +++ b/pkg/tsdb/cloudwatch/models/cloudwatch_query.go @@ -62,6 +62,8 @@ type sqlExpression struct { type CloudWatchQuery struct { logger log.Logger + StartTime time.Time + EndTime time.Time RefId string Region string Id string @@ -251,6 +253,8 @@ func ParseMetricDataQueries(dataQueries []backend.DataQuery, startTime time.Time for refId, mdq := range metricDataQueries { cwQuery := &CloudWatchQuery{ logger: logger, + StartTime: startTime, + EndTime: endTime, RefId: refId, Id: utils.Depointerizer(mdq.Id), Region: utils.Depointerizer(mdq.Region), diff --git a/pkg/tsdb/cloudwatch/models/cloudwatch_query_test.go b/pkg/tsdb/cloudwatch/models/cloudwatch_query_test.go index 0f4d7d3ed1b..1459c923555 100644 --- a/pkg/tsdb/cloudwatch/models/cloudwatch_query_test.go +++ b/pkg/tsdb/cloudwatch/models/cloudwatch_query_test.go @@ -272,6 +272,10 @@ func TestRequestParser(t *testing.T) { QueryType: "timeSeriesQuery", Interval: 0, RefID: "A", + TimeRange: backend.TimeRange{ + From: time.Now(), + To: time.Now(), + }, JSON: json.RawMessage(`{ "region":"us-east-1", "namespace":"ec2", @@ -303,6 +307,10 @@ func TestRequestParser(t *testing.T) { QueryType: "timeSeriesQuery", Interval: 0, RefID: "A", + TimeRange: backend.TimeRange{ + From: time.Now(), + To: time.Now(), + }, JSON: json.RawMessage(`{ "region":"us-east-1", "namespace":"ec2", diff --git a/pkg/tsdb/cloudwatch/response_parser.go b/pkg/tsdb/cloudwatch/response_parser.go index 0dd89476300..85b69a00a8b 100644 --- a/pkg/tsdb/cloudwatch/response_parser.go +++ b/pkg/tsdb/cloudwatch/response_parser.go @@ -18,7 +18,7 @@ import ( // matches a dynamic label var dynamicLabel = regexp.MustCompile(`\$\{.+\}`) -func (e *cloudWatchExecutor) parseResponse(ctx context.Context, startTime time.Time, endTime time.Time, metricDataOutputs []*cloudwatch.GetMetricDataOutput, +func (e *cloudWatchExecutor) parseResponse(ctx context.Context, metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries []*models.CloudWatchQuery) ([]*responseWrapper, error) { aggregatedResponse := aggregateResponse(metricDataOutputs) queriesById := map[string]*models.CloudWatchQuery{} @@ -40,7 +40,7 @@ func (e *cloudWatchExecutor) parseResponse(ctx context.Context, startTime time.T } var err error - dataRes.Frames, err = buildDataFrames(ctx, startTime, endTime, response, queryRow) + dataRes.Frames, err = buildDataFrames(ctx, response, queryRow) if err != nil { return nil, err } @@ -164,7 +164,7 @@ func getLabels(cloudwatchLabel string, query *models.CloudWatchQuery, addSeriesL return labels } -func buildDataFrames(ctx context.Context, startTime time.Time, endTime time.Time, aggregatedResponse models.QueryRowResponse, +func buildDataFrames(ctx context.Context, aggregatedResponse models.QueryRowResponse, query *models.CloudWatchQuery) (data.Frames, error) { frames := data.Frames{} hasStaticLabel := query.Label != "" && !dynamicLabel.MatchString(query.Label) @@ -172,7 +172,7 @@ func buildDataFrames(ctx context.Context, startTime time.Time, endTime time.Time for _, metric := range aggregatedResponse.Metrics { label := *metric.Label - deepLink, err := query.BuildDeepLink(startTime, endTime) + deepLink, err := query.BuildDeepLink(query.StartTime, query.EndTime) if err != nil { return nil, err } diff --git a/pkg/tsdb/cloudwatch/response_parser_test.go b/pkg/tsdb/cloudwatch/response_parser_test.go index 39e09c8d724..d1c8229fa3e 100644 --- a/pkg/tsdb/cloudwatch/response_parser_test.go +++ b/pkg/tsdb/cloudwatch/response_parser_test.go @@ -188,6 +188,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -202,7 +204,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricEditorMode: models.MetricEditorModeBuilder, MatchExact: true, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) frame1 := frames[0] @@ -256,6 +258,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -270,7 +274,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricQueryType: models.MetricQueryTypeSearch, MetricEditorMode: models.MetricEditorModeBuilder, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "some label lb3", frames[0].Name) @@ -305,6 +309,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { }, } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -317,7 +323,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricQueryType: models.MetricQueryTypeSearch, MetricEditorMode: models.MetricEditorModeBuilder, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Len(t, frames, 2) @@ -347,6 +353,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -361,7 +369,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricQueryType: models.MetricQueryTypeSearch, MetricEditorMode: models.MetricEditorModeBuilder, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Len(t, frames, 2) @@ -393,6 +401,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -407,7 +417,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricQueryType: models.MetricQueryTypeSearch, MetricEditorMode: models.MetricEditorModeBuilder, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "some label", frames[0].Name) @@ -433,6 +443,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -448,7 +460,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricEditorMode: models.MetricEditorModeBuilder, Label: "set ${AVG} label", } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "some label", frames[0].Name) @@ -474,6 +486,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -489,7 +503,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricEditorMode: models.MetricEditorModeBuilder, Label: "actual", } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "actual", frames[0].Name) @@ -515,6 +529,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "", @@ -527,7 +543,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricEditorMode: models.MetricEditorModeRaw, Label: "actual", } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "actual", frames[0].Name) @@ -560,6 +576,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Statistic: "Average", @@ -569,7 +587,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { Dimensions: map[string][]string{"Service": {"EC2", "Elastic Loading Balancing"}, "Resource": {"vCPU", "ApplicationLoadBalancersPerRegion"}}, SqlExpression: "SELECT AVG(ResourceCount) FROM SCHEMA(\"AWS/Usage\", Class, Resource, Service, Type) GROUP BY Service, Resource", } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "EC2 vCPU", frames[0].Name) @@ -597,6 +615,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Statistic: "Average", @@ -605,7 +625,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricEditorMode: models.MetricEditorModeBuilder, SqlExpression: "SELECT AVG(ResourceCount) FROM SCHEMA(\"AWS/Usage\", Class, Resource, Service, Type)", } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "cloudwatch-default-label", frames[0].Name) @@ -629,6 +649,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -642,7 +664,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricQueryType: models.MetricQueryTypeSearch, MetricEditorMode: models.MetricEditorModeRaw, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) assert.Equal(t, "some label", frames[0].Name) @@ -673,6 +695,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { } query := &models.CloudWatchQuery{ + StartTime: startTime, + EndTime: endTime, RefId: "refId1", Region: "us-east-1", Namespace: "AWS/ApplicationELB", @@ -686,7 +710,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) { MetricQueryType: models.MetricQueryTypeSearch, MetricEditorMode: models.MetricEditorModeBuilder, } - frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query) + frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query) require.NoError(t, err) frame := frames[0] diff --git a/pkg/tsdb/cloudwatch/time_series_query.go b/pkg/tsdb/cloudwatch/time_series_query.go index 42f9e6da7e5..97a14db8daf 100644 --- a/pkg/tsdb/cloudwatch/time_series_query.go +++ b/pkg/tsdb/cloudwatch/time_series_query.go @@ -25,12 +25,6 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba if len(req.Queries) == 0 { return nil, backend.DownstreamError(fmt.Errorf("request contains no queries")) } - // startTime and endTime are always the same for all queries - startTime := req.Queries[0].TimeRange.From - endTime := req.Queries[0].TimeRange.To - if !startTime.Before(endTime) { - return nil, backend.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time")) - } instance, err := e.getInstance(ctx, req.PluginContext) if err != nil { @@ -38,34 +32,45 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba return resp, nil } - requestQueries, err := models.ParseMetricDataQueries(req.Queries, startTime, endTime, instance.Settings.Region, e.logger.FromContext(ctx), - features.IsEnabled(ctx, features.FlagCloudWatchCrossAccountQuerying)) - if err != nil { - return nil, err - } - - if len(requestQueries) == 0 { - return backend.NewQueryDataResponse(), nil - } - - requestQueriesByRegion := make(map[string][]*models.CloudWatchQuery) - for _, query := range requestQueries { - if _, exist := requestQueriesByRegion[query.Region]; !exist { - requestQueriesByRegion[query.Region] = []*models.CloudWatchQuery{} + timeBatches := utils.BatchDataQueriesByTimeRange(req.Queries) + requestQueriesByTimeAndRegion := make(map[string][]*models.CloudWatchQuery) + for i, timeBatch := range timeBatches { + startTime := timeBatch[0].TimeRange.From + endTime := timeBatch[0].TimeRange.To + if !startTime.Before(endTime) { + return nil, backend.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time")) } - requestQueriesByRegion[query.Region] = append(requestQueriesByRegion[query.Region], query) + requestQueries, err := models.ParseMetricDataQueries(timeBatch, startTime, endTime, instance.Settings.Region, e.logger.FromContext(ctx), + features.IsEnabled(ctx, features.FlagCloudWatchCrossAccountQuerying)) + if err != nil { + return nil, err + } + + for _, query := range requestQueries { + key := fmt.Sprintf("%d %s", i, query.Region) + if _, exist := requestQueriesByTimeAndRegion[key]; !exist { + requestQueriesByTimeAndRegion[key] = []*models.CloudWatchQuery{} + } + requestQueriesByTimeAndRegion[key] = append(requestQueriesByTimeAndRegion[key], query) + } + } + if len(requestQueriesByTimeAndRegion) == 0 { + return backend.NewQueryDataResponse(), nil } resultChan := make(chan *responseWrapper, len(req.Queries)) eg, ectx := errgroup.WithContext(ctx) - for r, regionQueries := range requestQueriesByRegion { - region := r - - batches := [][]*models.CloudWatchQuery{regionQueries} + for _, timeAndRegionQueries := range requestQueriesByTimeAndRegion { + batches := [][]*models.CloudWatchQuery{timeAndRegionQueries} if features.IsEnabled(ctx, features.FlagCloudWatchBatchQueries) { - batches = getMetricQueryBatches(regionQueries, e.logger.FromContext(ctx)) + batches = getMetricQueryBatches(timeAndRegionQueries, e.logger.FromContext(ctx)) } + // region, startTime, and endTime are the same for the set of queries + region := timeAndRegionQueries[0].Region + startTime := timeAndRegionQueries[0].StartTime + endTime := timeAndRegionQueries[0].EndTime + for _, batch := range batches { requestQueries := batch eg.Go(func() error { @@ -113,7 +118,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba return err } - res, err := e.parseResponse(ctx, startTime, endTime, mdo, requestQueries) + res, err := e.parseResponse(ctx, mdo, requestQueries) if err != nil { return err } @@ -130,7 +135,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba if err := eg.Wait(); err != nil { dataResponse := backend.ErrorResponseWithErrorSource(fmt.Errorf("metric request error: %w", err)) resultChan <- &responseWrapper{ - RefId: getQueryRefIdFromErrorString(err.Error(), requestQueries), + RefId: getQueryRefIdFromErrorString(err.Error(), requestQueriesByTimeAndRegion), DataResponse: &dataResponse, } } @@ -143,14 +148,16 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba return resp, nil } -func getQueryRefIdFromErrorString(err string, queries []*models.CloudWatchQuery) string { +func getQueryRefIdFromErrorString(err string, queriesByRegion map[string][]*models.CloudWatchQuery) string { // error can be in format "Error in expression 'test': Invalid syntax" // so we can find the query id or ref id between the quotations erroredRefId := "" - for _, query := range queries { - if regexp.MustCompile(`'`+query.RefId+`':`).MatchString(err) || regexp.MustCompile(`'`+query.Id+`':`).MatchString(err) { - erroredRefId = query.RefId + for _, queries := range queriesByRegion { + for _, query := range queries { + if regexp.MustCompile(`'`+query.RefId+`':`).MatchString(err) || regexp.MustCompile(`'`+query.Id+`':`).MatchString(err) { + erroredRefId = query.RefId + } } } // if errorRefId is empty, it means the error concerns all queries (error metric limit exceeded, for example) diff --git a/pkg/tsdb/cloudwatch/time_series_query_test.go b/pkg/tsdb/cloudwatch/time_series_query_test.go index dcc6ba0cfb1..c2e65bb5434 100644 --- a/pkg/tsdb/cloudwatch/time_series_query_test.go +++ b/pkg/tsdb/cloudwatch/time_series_query_test.go @@ -118,18 +118,26 @@ func TestTimeSeriesQuery(t *testing.T) { }) t.Run("End time before start time should result in error", func(t *testing.T) { - _, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ - From: now.Add(time.Hour * -1), - To: now.Add(time.Hour * -2), - }}}}) + _, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ + From: now.Add(time.Hour * -1), + To: now.Add(time.Hour * -2), + }}}}) assert.EqualError(t, err, "invalid time range: start time must be before end time") }) t.Run("End time equals start time should result in error", func(t *testing.T) { - _, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ - From: now.Add(time.Hour * -1), - To: now.Add(time.Hour * -1), - }}}}) + _, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{ + From: now.Add(time.Hour * -1), + To: now.Add(time.Hour * -1), + }}}}) assert.EqualError(t, err, "invalid time range: start time must be before end time") }) } @@ -271,6 +279,72 @@ func Test_executeTimeSeriesQuery_getCWClient_is_called_once_per_region_and_GetMe mockMetricClient.AssertNumberOfCalls(t, "GetMetricDataWithContext", 2) // GetMetricData is asserted to have been called 2 times, presumably once for each group of regions (2 regions total) }) + + t.Run("3 queries with 2 time ranges calls GetSessionWithAuthSettings 2 times and calls GetMetricDataWithContext 2 times", func(t *testing.T) { + sessionCache := &mockSessionCache{} + sessionCache.On("GetSessionWithAuthSettings", mock.MatchedBy( + func(config awsds.GetSessionConfig) bool { + return config.Settings.Region == "us-east-2" + })). + Return(&session.Session{Config: &aws.Config{}}, nil).Times(2) + + im := datasource.NewInstanceManager(func(ctx context.Context, s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return DataSource{Settings: models.CloudWatchSettings{}, sessions: sessionCache}, nil + }) + + mockMetricClient = mocks.MetricsAPI{} + mockMetricClient.On("GetMetricDataWithContext", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + executor := newExecutor(im, log.NewNullLogger()) + _, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}, + }, + Queries: []backend.DataQuery{ + { + RefID: "A", + TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now()}, + JSON: json.RawMessage(`{ + "type": "timeSeriesQuery", + "namespace": "AWS/EC2", + "metricName": "NetworkOut", + "region": "us-east-2", + "statistic": "Maximum", + "period": "300" + }`), + }, + { + RefID: "A2", + TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now()}, + JSON: json.RawMessage(`{ + "type": "timeSeriesQuery", + "namespace": "AWS/EC2", + "metricName": "NetworkOut", + "region": "us-east-2", + "statistic": "Maximum", + "period": "300" + }`), + }, + { + RefID: "B", + TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)}, + JSON: json.RawMessage(`{ + "type": "timeSeriesQuery", + "namespace": "AWS/EC2", + "metricName": "NetworkIn", + "region": "us-east-2", + "statistic": "Maximum", + "period": "300" + }`), + }, + }, + }) + + require.NoError(t, err) + sessionCache.AssertExpectations(t) // method is defined to return twice (once for each batch) + mockMetricClient.AssertNumberOfCalls(t, "GetMetricDataWithContext", 2) + // GetMetricData is asserted to have been called 2 times, presumably once for each time range (2 time ranges total) + }) } type queryDimensions struct { diff --git a/pkg/tsdb/cloudwatch/utils/metrics.go b/pkg/tsdb/cloudwatch/utils/metrics.go index 07d361ff2ef..e68e18865a5 100644 --- a/pkg/tsdb/cloudwatch/utils/metrics.go +++ b/pkg/tsdb/cloudwatch/utils/metrics.go @@ -1,6 +1,7 @@ package utils import ( + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -20,3 +21,19 @@ var QueriesTotalCounter = promauto.NewCounterVec( }, []string{"query_type"}, ) + +// BatchDataQueriesByTimeRange separates the passed in queries into batches based on time ranges +func BatchDataQueriesByTimeRange(queries []backend.DataQuery) [][]backend.DataQuery { + timeToBatch := make(map[backend.TimeRange][]backend.DataQuery) + + for _, query := range queries { + key := backend.TimeRange{From: query.TimeRange.From.UTC(), To: query.TimeRange.To.UTC()} + timeToBatch[key] = append(timeToBatch[key], query) + } + + finalBatches := [][]backend.DataQuery{} + for _, batch := range timeToBatch { + finalBatches = append(finalBatches, batch) + } + return finalBatches +} diff --git a/pkg/tsdb/cloudwatch/utils/metrics_test.go b/pkg/tsdb/cloudwatch/utils/metrics_test.go new file mode 100644 index 00000000000..77a44623302 --- /dev/null +++ b/pkg/tsdb/cloudwatch/utils/metrics_test.go @@ -0,0 +1,47 @@ +package utils + +import ( + "testing" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/stretchr/testify/require" +) + +func TestBatchDataQueriesByTimeRange(t *testing.T) { + start := time.Date(2024, time.November, 29, 0, 42, 34, 0, time.UTC) + FiveMin := time.Date(2024, time.November, 29, 0, 47, 34, 0, time.UTC) + TenMin := time.Date(2024, time.November, 29, 0, 52, 34, 0, time.UTC) + loc := time.FixedZone("UTC+1", 1*60*60) + FiveMinDifferentZone := time.Date(2024, time.November, 29, 1, 47, 34, 0, loc) + testQueries := []backend.DataQuery{ + { + RefID: "A", + TimeRange: backend.TimeRange{From: start, To: FiveMin}, + }, + { + RefID: "B", + TimeRange: backend.TimeRange{From: start, To: TenMin}, + }, + { + RefID: "C", + TimeRange: backend.TimeRange{From: start, To: FiveMinDifferentZone}, + }, + } + result := BatchDataQueriesByTimeRange(testQueries) + require.Equal(t, 2, len(result)) + var FiveMinQueries = result[0] + var TenMinQueries = result[1] + // Since BatchDataQueriesByTimeRange uses a map, we don't known the return indices for the batches + if len(result[0]) == 1 { + TenMinQueries = result[0] + FiveMinQueries = result[1] + } + + require.Equal(t, 2, len(FiveMinQueries)) + require.Equal(t, "A", FiveMinQueries[0].RefID) + require.Equal(t, "C", FiveMinQueries[1].RefID) + + require.Equal(t, 1, len(TenMinQueries)) + require.Equal(t, "B", TenMinQueries[0].RefID) +}