CloudWatch: Update query batching logic (#76075)

This commit is contained in:
Isabella Siu
2023-10-20 15:09:41 -04:00
committed by GitHub
parent 0ed0e0397c
commit ecbc52f515
8 changed files with 292 additions and 40 deletions

View File

@@ -78,6 +78,7 @@ Some features are enabled by default. You can disable these feature by setting t
| `awsAsyncQueryCaching` | Enable caching for async queries for Redshift and Athena. Requires that the `useCachingService` feature toggle is enabled and the datasource has caching and async query support enabled |
| `splitScopes` | Support faster dashboard and folder search by splitting permission scopes into parts |
| `reportingRetries` | Enables rendering retries for the reporting feature |
| `cloudWatchBatchQueries` | Runs CloudWatch metrics queries as separate batches |
## Experimental feature toggles

View File

@@ -139,6 +139,7 @@ export interface FeatureToggles {
formatString?: boolean;
transformationsVariableSupport?: boolean;
kubernetesPlaylists?: boolean;
cloudWatchBatchQueries?: boolean;
navAdminSubsections?: boolean;
recoveryThreshold?: boolean;
teamHttpHeaders?: boolean;

View File

@@ -848,6 +848,12 @@ var (
Stage: FeatureStageExperimental,
Owner: grafanaAppPlatformSquad,
},
{
Name: "cloudWatchBatchQueries",
Description: "Runs CloudWatch metrics queries as separate batches",
Stage: FeatureStagePublicPreview,
Owner: awsDatasourcesSquad,
},
{
Name: "navAdminSubsections",
Description: "Splits the administration section of the nav tree into subsections",

View File

@@ -120,6 +120,7 @@ enableNativeHTTPHistogram,experimental,@grafana/hosted-grafana-team,false,false,
formatString,experimental,@grafana/grafana-bi-squad,false,false,false,true
transformationsVariableSupport,experimental,@grafana/grafana-bi-squad,false,false,false,true
kubernetesPlaylists,experimental,@grafana/grafana-app-platform-squad,false,false,false,true
cloudWatchBatchQueries,preview,@grafana/aws-datasources,false,false,false,false
navAdminSubsections,experimental,@grafana/grafana-frontend-platform,false,false,false,false
recoveryThreshold,experimental,@grafana/alerting-squad,false,false,true,false
teamHttpHeaders,experimental,@grafana/grafana-authnz-team,false,false,false,false
1 Name Stage Owner requiresDevMode RequiresLicense RequiresRestart FrontendOnly
120 formatString experimental @grafana/grafana-bi-squad false false false true
121 transformationsVariableSupport experimental @grafana/grafana-bi-squad false false false true
122 kubernetesPlaylists experimental @grafana/grafana-app-platform-squad false false false true
123 cloudWatchBatchQueries preview @grafana/aws-datasources false false false false
124 navAdminSubsections experimental @grafana/grafana-frontend-platform false false false false
125 recoveryThreshold experimental @grafana/alerting-squad false false true false
126 teamHttpHeaders experimental @grafana/grafana-authnz-team false false false false

View File

@@ -491,6 +491,10 @@ const (
// Use the kubernetes API in the frontend for playlists
FlagKubernetesPlaylists = "kubernetesPlaylists"
// FlagCloudWatchBatchQueries
// Runs CloudWatch metrics queries as separate batches
FlagCloudWatchBatchQueries = "cloudWatchBatchQueries"
// FlagNavAdminSubsections
// Splits the administration section of the nav tree into subsections
FlagNavAdminSubsections = "navAdminSubsections"

View File

@@ -0,0 +1,84 @@
package cloudwatch
import (
"regexp"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
)
// nonWordRegex is for spliting the expressions to just functions and ids
var nonWordRegex = regexp.MustCompile(`\W+`)
// getMetricQueryBatches separates queries into batches if necessary. Metric Insight queries cannot run together, and math expressions must be run
// with all the queries they reference.
func getMetricQueryBatches(queries []*models.CloudWatchQuery, logger log.Logger) [][]*models.CloudWatchQuery {
metricInsightIndices := []int{}
mathIndices := []int{}
for i, query := range queries {
switch query.GetGetMetricDataAPIMode() {
case models.GMDApiModeSQLExpression:
metricInsightIndices = append(metricInsightIndices, i)
case models.GMDApiModeMathExpression:
mathIndices = append(mathIndices, i)
default:
}
}
// We only need multiple batches if there are multiple metrics insight queries
if len(metricInsightIndices) <= 1 {
return [][]*models.CloudWatchQuery{queries}
}
logger.Debug("Separating queries into batches")
// Map ids to their queries
idToIndex := map[string]int{}
for i, query := range queries {
if query.Id != "" {
idToIndex[query.Id] = i
}
}
// Find and track which queries are referenced by math queries
queryReferences := make([][]int, len(queries))
isReferenced := make([]bool, len(queries))
for _, idx := range mathIndices {
tokens := nonWordRegex.Split(queries[idx].Expression, -1)
references := []int{}
for _, token := range tokens {
ref, found := idToIndex[token]
if found {
references = append(references, ref)
isReferenced[ref] = true
}
}
queryReferences[idx] = references
}
// Create a new batch for every query not used in another query
batches := [][]*models.CloudWatchQuery{}
for i, used := range isReferenced {
if !used {
batches = append(batches, getReferencedQueries(queries, queryReferences, i))
}
}
return batches
}
// getReferencedQueries gets all the queries referenced by startQuery and its referenced queries
func getReferencedQueries(queries []*models.CloudWatchQuery, queryReferences [][]int, startQuery int) []*models.CloudWatchQuery {
usedQueries := make([]bool, len(queries))
batch := []*models.CloudWatchQuery{}
queriesToAdd := []int{startQuery}
usedQueries[startQuery] = true
for i := 0; i < len(queriesToAdd); i++ {
batch = append(batch, queries[queriesToAdd[i]])
for _, queryIdx := range queryReferences[queriesToAdd[i]] {
if !usedQueries[queryIdx] {
usedQueries[queryIdx] = true
queriesToAdd = append(queriesToAdd, queryIdx)
}
}
}
return batch
}

View File

@@ -0,0 +1,147 @@
package cloudwatch
import (
"testing"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
"github.com/stretchr/testify/assert"
)
func TestGetMetricQueryBatches(t *testing.T) {
logger := &logtest.Fake{}
insight1 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeQuery,
Id: "i1",
}
insight2 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeQuery,
Id: "i2",
}
insight3 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeQuery,
Id: "i3",
}
metricStat := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeBuilder,
Id: "s1",
}
m1_ref_i1 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeRaw,
Expression: "PERIOD(i1)",
Id: "m1",
}
m2_ref_i1 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeRaw,
Expression: "RATE(i1)",
Id: "m2",
}
m3_ref_m1_m2 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeRaw,
Expression: "m1 * m2",
Id: "m3",
}
m4_ref_s1 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeRaw,
Expression: "SUM(s1)",
Id: "m4",
}
m4_ref_i1_i3 := models.CloudWatchQuery{
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeRaw,
Expression: "PERIOD(i1) * RATE(i3)",
Id: "m5",
}
t.Run("zero insight queries should not separate into batches", func(t *testing.T) {
batch := []*models.CloudWatchQuery{
&metricStat,
&m1_ref_i1,
&m2_ref_i1,
&m3_ref_m1_m2,
&m4_ref_s1,
}
result := getMetricQueryBatches(batch, logger)
assert.Len(t, result, 1)
assert.Equal(t, batch, result[0])
})
t.Run("one insight query should not separate into batches", func(t *testing.T) {
batch := []*models.CloudWatchQuery{
&insight1,
&metricStat,
}
result := getMetricQueryBatches(batch, logger)
assert.Len(t, result, 1)
assert.ElementsMatch(t, batch, result[0])
})
t.Run("multiple insight queries should separate into batches", func(t *testing.T) {
batch := []*models.CloudWatchQuery{
&insight1,
&metricStat,
&insight2,
}
result := getMetricQueryBatches(batch, logger)
assert.Len(t, result, 3)
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1}, result[0])
assert.ElementsMatch(t, []*models.CloudWatchQuery{&metricStat}, result[1])
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight2}, result[2])
})
t.Run("math queries with one insight query should not separate into batches", func(t *testing.T) {
batch := []*models.CloudWatchQuery{
&insight1,
&metricStat,
&m1_ref_i1,
&m2_ref_i1,
&m3_ref_m1_m2,
&m4_ref_s1,
}
result := getMetricQueryBatches(batch, logger)
assert.Len(t, result, 1)
assert.ElementsMatch(t, batch, result[0])
})
t.Run("math queries with multiple insight queries should separate into batches", func(t *testing.T) {
batch := []*models.CloudWatchQuery{
&insight1,
&insight2,
&metricStat,
&m1_ref_i1,
&m2_ref_i1,
&m3_ref_m1_m2,
&m4_ref_s1,
}
result := getMetricQueryBatches(batch, logger)
assert.Len(t, result, 3)
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight2}, result[0])
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1, &m1_ref_i1, &m2_ref_i1, &m3_ref_m1_m2}, result[1])
assert.ElementsMatch(t, []*models.CloudWatchQuery{&metricStat, &m4_ref_s1}, result[2])
})
t.Run("a math query with multiple insight queries should batch them together", func(t *testing.T) {
batch := []*models.CloudWatchQuery{
&insight1,
&insight2,
&insight3,
&m1_ref_i1,
&m4_ref_i1_i3,
}
result := getMetricQueryBatches(batch, logger)
assert.Len(t, result, 3)
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight2}, result[0])
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1, &m1_ref_i1}, result[1])
// This batch is expected to get an error from AWS, which does not allow multiple insight queries in a batch
assert.ElementsMatch(t, []*models.CloudWatchQuery{&insight1, &insight3, &m4_ref_i1_i3}, result[2])
})
}

View File

@@ -56,9 +56,16 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, logger
resultChan := make(chan *responseWrapper, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, q := range requestQueriesByRegion {
requestQueries := q
for r, regionQueries := range requestQueriesByRegion {
region := r
batches := [][]*models.CloudWatchQuery{regionQueries}
if e.features.IsEnabled(featuremgmt.FlagCloudWatchBatchQueries) {
batches = getMetricQueryBatches(regionQueries, logger)
}
for _, batch := range batches {
requestQueries := batch
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
@@ -107,6 +114,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, logger
return nil
})
}
}
if err := eg.Wait(); err != nil {
dataResponse := backend.DataResponse{