diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index 3be0262b871..2d5f339c091 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -78,6 +78,7 @@ Some features are enabled by default. You can disable these feature by setting t | `awsAsyncQueryCaching` | Enable caching for async queries for Redshift and Athena. Requires that the `useCachingService` feature toggle is enabled and the datasource has caching and async query support enabled | | `splitScopes` | Support faster dashboard and folder search by splitting permission scopes into parts | | `reportingRetries` | Enables rendering retries for the reporting feature | +| `cloudWatchBatchQueries` | Runs CloudWatch metrics queries as separate batches | ## Experimental feature toggles diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index e48bb5d8ff7..f471b2d9b9a 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -139,6 +139,7 @@ export interface FeatureToggles { formatString?: boolean; transformationsVariableSupport?: boolean; kubernetesPlaylists?: boolean; + cloudWatchBatchQueries?: boolean; navAdminSubsections?: boolean; recoveryThreshold?: boolean; teamHttpHeaders?: boolean; diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index cd8e800e4a2..69fc59fad0f 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -848,6 +848,12 @@ var ( Stage: FeatureStageExperimental, Owner: grafanaAppPlatformSquad, }, + { + Name: "cloudWatchBatchQueries", + Description: "Runs CloudWatch metrics queries as separate batches", + Stage: FeatureStagePublicPreview, + Owner: awsDatasourcesSquad, + }, { Name: "navAdminSubsections", Description: "Splits the administration section of the nav tree into subsections", diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 45ad58008bf..dabfbe49e4d 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -120,6 +120,7 @@ enableNativeHTTPHistogram,experimental,@grafana/hosted-grafana-team,false,false, formatString,experimental,@grafana/grafana-bi-squad,false,false,false,true transformationsVariableSupport,experimental,@grafana/grafana-bi-squad,false,false,false,true kubernetesPlaylists,experimental,@grafana/grafana-app-platform-squad,false,false,false,true +cloudWatchBatchQueries,preview,@grafana/aws-datasources,false,false,false,false navAdminSubsections,experimental,@grafana/grafana-frontend-platform,false,false,false,false recoveryThreshold,experimental,@grafana/alerting-squad,false,false,true,false teamHttpHeaders,experimental,@grafana/grafana-authnz-team,false,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 7cb380e9015..299df484a21 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -491,6 +491,10 @@ const ( // Use the kubernetes API in the frontend for playlists FlagKubernetesPlaylists = "kubernetesPlaylists" + // FlagCloudWatchBatchQueries + // Runs CloudWatch metrics queries as separate batches + FlagCloudWatchBatchQueries = "cloudWatchBatchQueries" + // FlagNavAdminSubsections // Splits the administration section of the nav tree into subsections FlagNavAdminSubsections = "navAdminSubsections" diff --git a/pkg/tsdb/cloudwatch/get_metric_query_batches.go b/pkg/tsdb/cloudwatch/get_metric_query_batches.go new file mode 100644 index 00000000000..924966bc119 --- /dev/null +++ b/pkg/tsdb/cloudwatch/get_metric_query_batches.go @@ -0,0 +1,84 @@ +package cloudwatch + +import ( + "regexp" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/tsdb/cloudwatch/models" +) + +// nonWordRegex is for spliting the expressions to just functions and ids +var nonWordRegex = regexp.MustCompile(`\W+`) + +// getMetricQueryBatches separates queries into batches if necessary. Metric Insight queries cannot run together, and math expressions must be run +// with all the queries they reference. +func getMetricQueryBatches(queries []*models.CloudWatchQuery, logger log.Logger) [][]*models.CloudWatchQuery { + metricInsightIndices := []int{} + mathIndices := []int{} + for i, query := range queries { + switch query.GetGetMetricDataAPIMode() { + case models.GMDApiModeSQLExpression: + metricInsightIndices = append(metricInsightIndices, i) + case models.GMDApiModeMathExpression: + mathIndices = append(mathIndices, i) + default: + } + } + // We only need multiple batches if there are multiple metrics insight queries + if len(metricInsightIndices) <= 1 { + return [][]*models.CloudWatchQuery{queries} + } + + logger.Debug("Separating queries into batches") + // Map ids to their queries + idToIndex := map[string]int{} + for i, query := range queries { + if query.Id != "" { + idToIndex[query.Id] = i + } + } + + // Find and track which queries are referenced by math queries + queryReferences := make([][]int, len(queries)) + isReferenced := make([]bool, len(queries)) + for _, idx := range mathIndices { + tokens := nonWordRegex.Split(queries[idx].Expression, -1) + references := []int{} + for _, token := range tokens { + ref, found := idToIndex[token] + if found { + references = append(references, ref) + isReferenced[ref] = true + } + } + queryReferences[idx] = references + } + + // Create a new batch for every query not used in another query + batches := [][]*models.CloudWatchQuery{} + for i, used := range isReferenced { + if !used { + batches = append(batches, getReferencedQueries(queries, queryReferences, i)) + } + } + return batches +} + +// getReferencedQueries gets all the queries referenced by startQuery and its referenced queries +func getReferencedQueries(queries []*models.CloudWatchQuery, queryReferences [][]int, startQuery int) []*models.CloudWatchQuery { + usedQueries := make([]bool, len(queries)) + batch := []*models.CloudWatchQuery{} + + queriesToAdd := []int{startQuery} + usedQueries[startQuery] = true + for i := 0; i < len(queriesToAdd); i++ { + batch = append(batch, queries[queriesToAdd[i]]) + for _, queryIdx := range queryReferences[queriesToAdd[i]] { + if !usedQueries[queryIdx] { + usedQueries[queryIdx] = true + queriesToAdd = append(queriesToAdd, queryIdx) + } + } + } + return batch +} diff --git a/pkg/tsdb/cloudwatch/get_metric_query_batches_test.go b/pkg/tsdb/cloudwatch/get_metric_query_batches_test.go new file mode 100644 index 00000000000..59023ba2843 --- /dev/null +++ b/pkg/tsdb/cloudwatch/get_metric_query_batches_test.go @@ -0,0 +1,147 @@ +package cloudwatch + +import ( + "testing" + + "github.com/grafana/grafana/pkg/infra/log/logtest" + "github.com/grafana/grafana/pkg/tsdb/cloudwatch/models" + "github.com/stretchr/testify/assert" +) + +func TestGetMetricQueryBatches(t *testing.T) { + logger := &logtest.Fake{} + insight1 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeQuery, + Id: "i1", + } + insight2 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeQuery, + Id: "i2", + } + insight3 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeQuery, + Id: "i3", + } + metricStat := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeSearch, + MetricEditorMode: models.MetricEditorModeBuilder, + Id: "s1", + } + m1_ref_i1 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeSearch, + MetricEditorMode: models.MetricEditorModeRaw, + Expression: "PERIOD(i1)", + Id: "m1", + } + m2_ref_i1 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeSearch, + MetricEditorMode: models.MetricEditorModeRaw, + Expression: "RATE(i1)", + Id: "m2", + } + m3_ref_m1_m2 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeSearch, + MetricEditorMode: models.MetricEditorModeRaw, + Expression: "m1 * m2", + Id: "m3", + } + m4_ref_s1 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeSearch, + MetricEditorMode: models.MetricEditorModeRaw, + Expression: "SUM(s1)", + Id: "m4", + } + m4_ref_i1_i3 := models.CloudWatchQuery{ + MetricQueryType: models.MetricQueryTypeSearch, + MetricEditorMode: models.MetricEditorModeRaw, + Expression: "PERIOD(i1) * RATE(i3)", + Id: "m5", + } + + t.Run("zero insight queries should not separate into batches", func(t *testing.T) { + batch := []*models.CloudWatchQuery{ + &metricStat, + &m1_ref_i1, + &m2_ref_i1, + &m3_ref_m1_m2, + &m4_ref_s1, + } + + result := getMetricQueryBatches(batch, logger) + assert.Len(t, result, 1) + assert.Equal(t, batch, result[0]) + }) + + t.Run("one insight query should not separate into batches", func(t *testing.T) { + batch := []*models.CloudWatchQuery{ + &insight1, + &metricStat, + } + + result := getMetricQueryBatches(batch, logger) + assert.Len(t, result, 1) + assert.ElementsMatch(t, batch, result[0]) + }) + + t.Run("multiple insight queries should separate into batches", func(t *testing.T) { + batch := []*models.CloudWatchQuery{ + &insight1, + &metricStat, + &insight2, + } + + result := getMetricQueryBatches(batch, logger) + assert.Len(t, result, 3) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1}, result[0]) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&metricStat}, result[1]) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight2}, result[2]) + }) + + t.Run("math queries with one insight query should not separate into batches", func(t *testing.T) { + batch := []*models.CloudWatchQuery{ + &insight1, + &metricStat, + &m1_ref_i1, + &m2_ref_i1, + &m3_ref_m1_m2, + &m4_ref_s1, + } + + result := getMetricQueryBatches(batch, logger) + assert.Len(t, result, 1) + assert.ElementsMatch(t, batch, result[0]) + }) + t.Run("math queries with multiple insight queries should separate into batches", func(t *testing.T) { + batch := []*models.CloudWatchQuery{ + &insight1, + &insight2, + &metricStat, + &m1_ref_i1, + &m2_ref_i1, + &m3_ref_m1_m2, + &m4_ref_s1, + } + + result := getMetricQueryBatches(batch, logger) + assert.Len(t, result, 3) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight2}, result[0]) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1, &m1_ref_i1, &m2_ref_i1, &m3_ref_m1_m2}, result[1]) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&metricStat, &m4_ref_s1}, result[2]) + }) + t.Run("a math query with multiple insight queries should batch them together", func(t *testing.T) { + batch := []*models.CloudWatchQuery{ + &insight1, + &insight2, + &insight3, + &m1_ref_i1, + &m4_ref_i1_i3, + } + + result := getMetricQueryBatches(batch, logger) + assert.Len(t, result, 3) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight2}, result[0]) + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1, &m1_ref_i1}, result[1]) + // This batch is expected to get an error from AWS, which does not allow multiple insight queries in a batch + assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1, &insight3, &m4_ref_i1_i3}, result[2]) + }) +} diff --git a/pkg/tsdb/cloudwatch/time_series_query.go b/pkg/tsdb/cloudwatch/time_series_query.go index cf08f817029..12a27205e53 100644 --- a/pkg/tsdb/cloudwatch/time_series_query.go +++ b/pkg/tsdb/cloudwatch/time_series_query.go @@ -56,56 +56,64 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, logger resultChan := make(chan *responseWrapper, len(req.Queries)) eg, ectx := errgroup.WithContext(ctx) - for r, q := range requestQueriesByRegion { - requestQueries := q + for r, regionQueries := range requestQueriesByRegion { region := r - eg.Go(func() error { - defer func() { - if err := recover(); err != nil { - logger.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1)) - if theErr, ok := err.(error); ok { - resultChan <- &responseWrapper{ - DataResponse: &backend.DataResponse{ - Error: theErr, - }, + + batches := [][]*models.CloudWatchQuery{regionQueries} + if e.features.IsEnabled(featuremgmt.FlagCloudWatchBatchQueries) { + batches = getMetricQueryBatches(regionQueries, logger) + } + + for _, batch := range batches { + requestQueries := batch + eg.Go(func() error { + defer func() { + if err := recover(); err != nil { + logger.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1)) + if theErr, ok := err.(error); ok { + resultChan <- &responseWrapper{ + DataResponse: &backend.DataResponse{ + Error: theErr, + }, + } } } - } - }() + }() - client, err := e.getCWClient(ctx, req.PluginContext, region) - if err != nil { - return err - } - - metricDataInput, err := e.buildMetricDataInput(logger, startTime, endTime, requestQueries) - if err != nil { - return err - } - - mdo, err := e.executeRequest(ectx, client, metricDataInput) - if err != nil { - return err - } - - if e.features.IsEnabled(featuremgmt.FlagCloudWatchWildCardDimensionValues) { - requestQueries, err = e.getDimensionValuesForWildcards(req.PluginContext, region, client, requestQueries, instance.tagValueCache, logger) + client, err := e.getCWClient(ctx, req.PluginContext, region) if err != nil { return err } - } - res, err := e.parseResponse(startTime, endTime, mdo, requestQueries) - if err != nil { - return err - } + metricDataInput, err := e.buildMetricDataInput(logger, startTime, endTime, requestQueries) + if err != nil { + return err + } - for _, responseWrapper := range res { - resultChan <- responseWrapper - } + mdo, err := e.executeRequest(ectx, client, metricDataInput) + if err != nil { + return err + } - return nil - }) + if e.features.IsEnabled(featuremgmt.FlagCloudWatchWildCardDimensionValues) { + requestQueries, err = e.getDimensionValuesForWildcards(req.PluginContext, region, client, requestQueries, instance.tagValueCache, logger) + if err != nil { + return err + } + } + + res, err := e.parseResponse(startTime, endTime, mdo, requestQueries) + if err != nil { + return err + } + + for _, responseWrapper := range res { + resultChan <- responseWrapper + } + + return nil + }) + } } if err := eg.Wait(); err != nil {