CloudWatch: Batch different time ranges separately (#98230)

This commit is contained in:
Isabella Siu 2025-01-03 10:22:14 -05:00 committed by GitHub
parent 5429512779
commit f3a553fb9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 237 additions and 56 deletions

View File

@ -62,6 +62,8 @@ type sqlExpression struct {
type CloudWatchQuery struct {
logger log.Logger
StartTime time.Time
EndTime time.Time
RefId string
Region string
Id string
@ -251,6 +253,8 @@ func ParseMetricDataQueries(dataQueries []backend.DataQuery, startTime time.Time
for refId, mdq := range metricDataQueries {
cwQuery := &CloudWatchQuery{
logger: logger,
StartTime: startTime,
EndTime: endTime,
RefId: refId,
Id: utils.Depointerizer(mdq.Id),
Region: utils.Depointerizer(mdq.Region),

View File

@ -272,6 +272,10 @@ func TestRequestParser(t *testing.T) {
QueryType: "timeSeriesQuery",
Interval: 0,
RefID: "A",
TimeRange: backend.TimeRange{
From: time.Now(),
To: time.Now(),
},
JSON: json.RawMessage(`{
"region":"us-east-1",
"namespace":"ec2",
@ -303,6 +307,10 @@ func TestRequestParser(t *testing.T) {
QueryType: "timeSeriesQuery",
Interval: 0,
RefID: "A",
TimeRange: backend.TimeRange{
From: time.Now(),
To: time.Now(),
},
JSON: json.RawMessage(`{
"region":"us-east-1",
"namespace":"ec2",

View File

@ -18,7 +18,7 @@ import (
// matches a dynamic label
var dynamicLabel = regexp.MustCompile(`\$\{.+\}`)
func (e *cloudWatchExecutor) parseResponse(ctx context.Context, startTime time.Time, endTime time.Time, metricDataOutputs []*cloudwatch.GetMetricDataOutput,
func (e *cloudWatchExecutor) parseResponse(ctx context.Context, metricDataOutputs []*cloudwatch.GetMetricDataOutput,
queries []*models.CloudWatchQuery) ([]*responseWrapper, error) {
aggregatedResponse := aggregateResponse(metricDataOutputs)
queriesById := map[string]*models.CloudWatchQuery{}
@ -40,7 +40,7 @@ func (e *cloudWatchExecutor) parseResponse(ctx context.Context, startTime time.T
}
var err error
dataRes.Frames, err = buildDataFrames(ctx, startTime, endTime, response, queryRow)
dataRes.Frames, err = buildDataFrames(ctx, response, queryRow)
if err != nil {
return nil, err
}
@ -164,7 +164,7 @@ func getLabels(cloudwatchLabel string, query *models.CloudWatchQuery, addSeriesL
return labels
}
func buildDataFrames(ctx context.Context, startTime time.Time, endTime time.Time, aggregatedResponse models.QueryRowResponse,
func buildDataFrames(ctx context.Context, aggregatedResponse models.QueryRowResponse,
query *models.CloudWatchQuery) (data.Frames, error) {
frames := data.Frames{}
hasStaticLabel := query.Label != "" && !dynamicLabel.MatchString(query.Label)
@ -172,7 +172,7 @@ func buildDataFrames(ctx context.Context, startTime time.Time, endTime time.Time
for _, metric := range aggregatedResponse.Metrics {
label := *metric.Label
deepLink, err := query.BuildDeepLink(startTime, endTime)
deepLink, err := query.BuildDeepLink(query.StartTime, query.EndTime)
if err != nil {
return nil, err
}

View File

@ -188,6 +188,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -202,7 +204,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricEditorMode: models.MetricEditorModeBuilder,
MatchExact: true,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
frame1 := frames[0]
@ -256,6 +258,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -270,7 +274,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeBuilder,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "some label lb3", frames[0].Name)
@ -305,6 +309,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
},
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -317,7 +323,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeBuilder,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Len(t, frames, 2)
@ -347,6 +353,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -361,7 +369,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeBuilder,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Len(t, frames, 2)
@ -393,6 +401,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -407,7 +417,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeBuilder,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "some label", frames[0].Name)
@ -433,6 +443,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -448,7 +460,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricEditorMode: models.MetricEditorModeBuilder,
Label: "set ${AVG} label",
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "some label", frames[0].Name)
@ -474,6 +486,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -489,7 +503,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricEditorMode: models.MetricEditorModeBuilder,
Label: "actual",
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "actual", frames[0].Name)
@ -515,6 +529,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "",
@ -527,7 +543,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricEditorMode: models.MetricEditorModeRaw,
Label: "actual",
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "actual", frames[0].Name)
@ -560,6 +576,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Statistic: "Average",
@ -569,7 +587,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
Dimensions: map[string][]string{"Service": {"EC2", "Elastic Loading Balancing"}, "Resource": {"vCPU", "ApplicationLoadBalancersPerRegion"}},
SqlExpression: "SELECT AVG(ResourceCount) FROM SCHEMA(\"AWS/Usage\", Class, Resource, Service, Type) GROUP BY Service, Resource",
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "EC2 vCPU", frames[0].Name)
@ -597,6 +615,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Statistic: "Average",
@ -605,7 +625,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricEditorMode: models.MetricEditorModeBuilder,
SqlExpression: "SELECT AVG(ResourceCount) FROM SCHEMA(\"AWS/Usage\", Class, Resource, Service, Type)",
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "cloudwatch-default-label", frames[0].Name)
@ -629,6 +649,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -642,7 +664,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeRaw,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
assert.Equal(t, "some label", frames[0].Name)
@ -673,6 +695,8 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
}
query := &models.CloudWatchQuery{
StartTime: startTime,
EndTime: endTime,
RefId: "refId1",
Region: "us-east-1",
Namespace: "AWS/ApplicationELB",
@ -686,7 +710,7 @@ func Test_buildDataFrames_parse_label_to_name_and_labels(t *testing.T) {
MetricQueryType: models.MetricQueryTypeSearch,
MetricEditorMode: models.MetricEditorModeBuilder,
}
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), startTime, endTime, *response, query)
frames, err := buildDataFrames(contextWithFeaturesEnabled(features.FlagCloudWatchNewLabelParsing), *response, query)
require.NoError(t, err)
frame := frames[0]

View File

@ -25,12 +25,6 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
if len(req.Queries) == 0 {
return nil, backend.DownstreamError(fmt.Errorf("request contains no queries"))
}
// startTime and endTime are always the same for all queries
startTime := req.Queries[0].TimeRange.From
endTime := req.Queries[0].TimeRange.To
if !startTime.Before(endTime) {
return nil, backend.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"))
}
instance, err := e.getInstance(ctx, req.PluginContext)
if err != nil {
@ -38,34 +32,45 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
return resp, nil
}
requestQueries, err := models.ParseMetricDataQueries(req.Queries, startTime, endTime, instance.Settings.Region, e.logger.FromContext(ctx),
features.IsEnabled(ctx, features.FlagCloudWatchCrossAccountQuerying))
if err != nil {
return nil, err
}
if len(requestQueries) == 0 {
return backend.NewQueryDataResponse(), nil
}
requestQueriesByRegion := make(map[string][]*models.CloudWatchQuery)
for _, query := range requestQueries {
if _, exist := requestQueriesByRegion[query.Region]; !exist {
requestQueriesByRegion[query.Region] = []*models.CloudWatchQuery{}
timeBatches := utils.BatchDataQueriesByTimeRange(req.Queries)
requestQueriesByTimeAndRegion := make(map[string][]*models.CloudWatchQuery)
for i, timeBatch := range timeBatches {
startTime := timeBatch[0].TimeRange.From
endTime := timeBatch[0].TimeRange.To
if !startTime.Before(endTime) {
return nil, backend.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"))
}
requestQueriesByRegion[query.Region] = append(requestQueriesByRegion[query.Region], query)
requestQueries, err := models.ParseMetricDataQueries(timeBatch, startTime, endTime, instance.Settings.Region, e.logger.FromContext(ctx),
features.IsEnabled(ctx, features.FlagCloudWatchCrossAccountQuerying))
if err != nil {
return nil, err
}
for _, query := range requestQueries {
key := fmt.Sprintf("%d %s", i, query.Region)
if _, exist := requestQueriesByTimeAndRegion[key]; !exist {
requestQueriesByTimeAndRegion[key] = []*models.CloudWatchQuery{}
}
requestQueriesByTimeAndRegion[key] = append(requestQueriesByTimeAndRegion[key], query)
}
}
if len(requestQueriesByTimeAndRegion) == 0 {
return backend.NewQueryDataResponse(), nil
}
resultChan := make(chan *responseWrapper, len(req.Queries))
eg, ectx := errgroup.WithContext(ctx)
for r, regionQueries := range requestQueriesByRegion {
region := r
batches := [][]*models.CloudWatchQuery{regionQueries}
for _, timeAndRegionQueries := range requestQueriesByTimeAndRegion {
batches := [][]*models.CloudWatchQuery{timeAndRegionQueries}
if features.IsEnabled(ctx, features.FlagCloudWatchBatchQueries) {
batches = getMetricQueryBatches(regionQueries, e.logger.FromContext(ctx))
batches = getMetricQueryBatches(timeAndRegionQueries, e.logger.FromContext(ctx))
}
// region, startTime, and endTime are the same for the set of queries
region := timeAndRegionQueries[0].Region
startTime := timeAndRegionQueries[0].StartTime
endTime := timeAndRegionQueries[0].EndTime
for _, batch := range batches {
requestQueries := batch
eg.Go(func() error {
@ -113,7 +118,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
return err
}
res, err := e.parseResponse(ctx, startTime, endTime, mdo, requestQueries)
res, err := e.parseResponse(ctx, mdo, requestQueries)
if err != nil {
return err
}
@ -130,7 +135,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
if err := eg.Wait(); err != nil {
dataResponse := backend.ErrorResponseWithErrorSource(fmt.Errorf("metric request error: %w", err))
resultChan <- &responseWrapper{
RefId: getQueryRefIdFromErrorString(err.Error(), requestQueries),
RefId: getQueryRefIdFromErrorString(err.Error(), requestQueriesByTimeAndRegion),
DataResponse: &dataResponse,
}
}
@ -143,14 +148,16 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
return resp, nil
}
func getQueryRefIdFromErrorString(err string, queries []*models.CloudWatchQuery) string {
func getQueryRefIdFromErrorString(err string, queriesByRegion map[string][]*models.CloudWatchQuery) string {
// error can be in format "Error in expression 'test': Invalid syntax"
// so we can find the query id or ref id between the quotations
erroredRefId := ""
for _, query := range queries {
if regexp.MustCompile(`'`+query.RefId+`':`).MatchString(err) || regexp.MustCompile(`'`+query.Id+`':`).MatchString(err) {
erroredRefId = query.RefId
for _, queries := range queriesByRegion {
for _, query := range queries {
if regexp.MustCompile(`'`+query.RefId+`':`).MatchString(err) || regexp.MustCompile(`'`+query.Id+`':`).MatchString(err) {
erroredRefId = query.RefId
}
}
}
// if errorRefId is empty, it means the error concerns all queries (error metric limit exceeded, for example)

View File

@ -118,18 +118,26 @@ func TestTimeSeriesQuery(t *testing.T) {
})
t.Run("End time before start time should result in error", func(t *testing.T) {
_, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{
From: now.Add(time.Hour * -1),
To: now.Add(time.Hour * -2),
}}}})
_, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{
From: now.Add(time.Hour * -1),
To: now.Add(time.Hour * -2),
}}}})
assert.EqualError(t, err, "invalid time range: start time must be before end time")
})
t.Run("End time equals start time should result in error", func(t *testing.T) {
_, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{
From: now.Add(time.Hour * -1),
To: now.Add(time.Hour * -1),
}}}})
_, err := executor.executeTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{{TimeRange: backend.TimeRange{
From: now.Add(time.Hour * -1),
To: now.Add(time.Hour * -1),
}}}})
assert.EqualError(t, err, "invalid time range: start time must be before end time")
})
}
@ -271,6 +279,72 @@ func Test_executeTimeSeriesQuery_getCWClient_is_called_once_per_region_and_GetMe
mockMetricClient.AssertNumberOfCalls(t, "GetMetricDataWithContext", 2)
// GetMetricData is asserted to have been called 2 times, presumably once for each group of regions (2 regions total)
})
t.Run("3 queries with 2 time ranges calls GetSessionWithAuthSettings 2 times and calls GetMetricDataWithContext 2 times", func(t *testing.T) {
sessionCache := &mockSessionCache{}
sessionCache.On("GetSessionWithAuthSettings", mock.MatchedBy(
func(config awsds.GetSessionConfig) bool {
return config.Settings.Region == "us-east-2"
})).
Return(&session.Session{Config: &aws.Config{}}, nil).Times(2)
im := datasource.NewInstanceManager(func(ctx context.Context, s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return DataSource{Settings: models.CloudWatchSettings{}, sessions: sessionCache}, nil
})
mockMetricClient = mocks.MetricsAPI{}
mockMetricClient.On("GetMetricDataWithContext", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
executor := newExecutor(im, log.NewNullLogger())
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
},
Queries: []backend.DataQuery{
{
RefID: "A",
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now()},
JSON: json.RawMessage(`{
"type": "timeSeriesQuery",
"namespace": "AWS/EC2",
"metricName": "NetworkOut",
"region": "us-east-2",
"statistic": "Maximum",
"period": "300"
}`),
},
{
RefID: "A2",
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now()},
JSON: json.RawMessage(`{
"type": "timeSeriesQuery",
"namespace": "AWS/EC2",
"metricName": "NetworkOut",
"region": "us-east-2",
"statistic": "Maximum",
"period": "300"
}`),
},
{
RefID: "B",
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)},
JSON: json.RawMessage(`{
"type": "timeSeriesQuery",
"namespace": "AWS/EC2",
"metricName": "NetworkIn",
"region": "us-east-2",
"statistic": "Maximum",
"period": "300"
}`),
},
},
})
require.NoError(t, err)
sessionCache.AssertExpectations(t) // method is defined to return twice (once for each batch)
mockMetricClient.AssertNumberOfCalls(t, "GetMetricDataWithContext", 2)
// GetMetricData is asserted to have been called 2 times, presumably once for each time range (2 time ranges total)
})
}
type queryDimensions struct {

View File

@ -1,6 +1,7 @@
package utils
import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
@ -20,3 +21,19 @@ var QueriesTotalCounter = promauto.NewCounterVec(
},
[]string{"query_type"},
)
// BatchDataQueriesByTimeRange separates the passed in queries into batches based on time ranges
func BatchDataQueriesByTimeRange(queries []backend.DataQuery) [][]backend.DataQuery {
timeToBatch := make(map[backend.TimeRange][]backend.DataQuery)
for _, query := range queries {
key := backend.TimeRange{From: query.TimeRange.From.UTC(), To: query.TimeRange.To.UTC()}
timeToBatch[key] = append(timeToBatch[key], query)
}
finalBatches := [][]backend.DataQuery{}
for _, batch := range timeToBatch {
finalBatches = append(finalBatches, batch)
}
return finalBatches
}

View File

@ -0,0 +1,47 @@
package utils
import (
"testing"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
)
func TestBatchDataQueriesByTimeRange(t *testing.T) {
start := time.Date(2024, time.November, 29, 0, 42, 34, 0, time.UTC)
FiveMin := time.Date(2024, time.November, 29, 0, 47, 34, 0, time.UTC)
TenMin := time.Date(2024, time.November, 29, 0, 52, 34, 0, time.UTC)
loc := time.FixedZone("UTC+1", 1*60*60)
FiveMinDifferentZone := time.Date(2024, time.November, 29, 1, 47, 34, 0, loc)
testQueries := []backend.DataQuery{
{
RefID: "A",
TimeRange: backend.TimeRange{From: start, To: FiveMin},
},
{
RefID: "B",
TimeRange: backend.TimeRange{From: start, To: TenMin},
},
{
RefID: "C",
TimeRange: backend.TimeRange{From: start, To: FiveMinDifferentZone},
},
}
result := BatchDataQueriesByTimeRange(testQueries)
require.Equal(t, 2, len(result))
var FiveMinQueries = result[0]
var TenMinQueries = result[1]
// Since BatchDataQueriesByTimeRange uses a map, we don't known the return indices for the batches
if len(result[0]) == 1 {
TenMinQueries = result[0]
FiveMinQueries = result[1]
}
require.Equal(t, 2, len(FiveMinQueries))
require.Equal(t, "A", FiveMinQueries[0].RefID)
require.Equal(t, "C", FiveMinQueries[1].RefID)
require.Equal(t, 1, len(TenMinQueries))
require.Equal(t, "B", TenMinQueries[0].RefID)
}