mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
CloudWatch: Refactor to extract DataQuery grouping by region out of request parsing (#57392)
* CloudWatch: Extract request parsing grouping by region from ParseQueries * Rename ParseQueries to ParseMetricDataQueries
This commit is contained in:
parent
1bda67ab11
commit
9ebed91eed
@ -5,6 +5,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type FakeMetricsAPI struct {
|
||||
@ -56,3 +57,14 @@ func chunkSlice(slice []*cloudwatch.Metric, chunkSize int) [][]*cloudwatch.Metri
|
||||
|
||||
return chunks
|
||||
}
|
||||
|
||||
type MetricsClient struct {
|
||||
cloudwatchiface.CloudWatchAPI
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MetricsClient) GetMetricDataWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, opts ...request.Option) (*cloudwatch.GetMetricDataOutput, error) {
|
||||
args := m.Called(ctx, input, opts)
|
||||
|
||||
return args.Get(0).(*cloudwatch.GetMetricDataOutput), args.Error(1)
|
||||
}
|
||||
|
@ -45,9 +45,10 @@ type metricsDataQuery struct {
|
||||
Alias string `json:"alias,omitempty"`
|
||||
}
|
||||
|
||||
// ParseQueries parses the json queries and returns a map of cloudWatchQueries by region. The cloudWatchQuery has a 1 to 1 mapping to a query editor row
|
||||
func ParseQueries(queries []backend.DataQuery, startTime time.Time, endTime time.Time, dynamicLabelsEnabled bool) (map[string][]*CloudWatchQuery, error) {
|
||||
result := make(map[string][]*CloudWatchQuery)
|
||||
// ParseMetricDataQueries decodes the metric data queries json, validates, sets default values and returns an array of CloudWatchQueries.
|
||||
// The CloudWatchQuery has a 1 to 1 mapping to a query editor row
|
||||
func ParseMetricDataQueries(queries []backend.DataQuery, startTime time.Time, endTime time.Time, dynamicLabelsEnabled bool) ([]*CloudWatchQuery, error) {
|
||||
var result []*CloudWatchQuery
|
||||
migratedQueries, err := migrateLegacyQuery(queries, dynamicLabelsEnabled)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -71,15 +72,11 @@ func ParseQueries(queries []backend.DataQuery, startTime time.Time, endTime time
|
||||
}
|
||||
|
||||
refID := query.RefID
|
||||
query, err := parseRequestQuery(metricsDataQuery, refID, startTime, endTime)
|
||||
cwQuery, err := parseRequestQuery(metricsDataQuery, refID, startTime, endTime)
|
||||
if err != nil {
|
||||
return nil, &QueryError{Err: err, RefID: refID}
|
||||
}
|
||||
|
||||
if _, exist := result[query.Region]; !exist {
|
||||
result[query.Region] = []*CloudWatchQuery{}
|
||||
}
|
||||
result[query.Region] = append(result[query.Region], query)
|
||||
result = append(result, cwQuery)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
@ -31,15 +31,23 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
|
||||
return nil, fmt.Errorf("invalid time range: start time must be before end time")
|
||||
}
|
||||
|
||||
requestQueriesByRegion, err := models.ParseQueries(req.Queries, startTime, endTime, e.features.IsEnabled(featuremgmt.FlagCloudWatchDynamicLabels))
|
||||
requestQueries, err := models.ParseMetricDataQueries(req.Queries, startTime, endTime, e.features.IsEnabled(featuremgmt.FlagCloudWatchDynamicLabels))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(requestQueriesByRegion) == 0 {
|
||||
if len(requestQueries) == 0 {
|
||||
return backend.NewQueryDataResponse(), nil
|
||||
}
|
||||
|
||||
requestQueriesByRegion := make(map[string][]*models.CloudWatchQuery)
|
||||
for _, query := range requestQueries {
|
||||
if _, exist := requestQueriesByRegion[query.Region]; !exist {
|
||||
requestQueriesByRegion[query.Region] = []*models.CloudWatchQuery{}
|
||||
}
|
||||
requestQueriesByRegion[query.Region] = append(requestQueriesByRegion[query.Region], query)
|
||||
}
|
||||
|
||||
resultChan := make(chan *responseWrapper, len(req.Queries))
|
||||
eg, ectx := errgroup.WithContext(ctx)
|
||||
for r, q := range requestQueriesByRegion {
|
||||
|
@ -10,9 +10,11 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/mocks"
|
||||
@ -140,6 +142,135 @@ func TestTimeSeriesQuery(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func Test_executeTimeSeriesQuery_getCWClient_is_called_once_per_region_and_GetMetricData_is_called_once_per_grouping_of_queries_by_region(t *testing.T) {
|
||||
/* TODO: This test aims to verify the logic to group regions which has been extracted from ParseMetricDataQueries.
|
||||
It should be replaced by a test at a lower level when grouping by regions is incorporated into a separate business logic layer */
|
||||
origNewCWClient := NewCWClient
|
||||
t.Cleanup(func() {
|
||||
NewCWClient = origNewCWClient
|
||||
})
|
||||
|
||||
var mockMetricClient mocks.MetricsClient
|
||||
NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
|
||||
return &mockMetricClient
|
||||
}
|
||||
|
||||
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
return datasourceInfo{}, nil
|
||||
})
|
||||
|
||||
t.Run("Queries with the same region should call GetSession with that region 1 time and call GetMetricDataWithContext 1 time", func(t *testing.T) {
|
||||
mockSessionCache := &mockSessionCache{}
|
||||
mockSessionCache.On("GetSession", mock.MatchedBy(
|
||||
func(config awsds.SessionConfig) bool { return config.Settings.Region == "us-east-1" })). // region from queries is asserted here
|
||||
Return(&session.Session{Config: &aws.Config{}}, nil).Once()
|
||||
mockMetricClient = mocks.MetricsClient{}
|
||||
mockMetricClient.On("GetMetricDataWithContext", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||
|
||||
executor := newExecutor(im, newTestConfig(), mockSessionCache, featuremgmt.WithFeatures())
|
||||
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
|
||||
},
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)},
|
||||
JSON: json.RawMessage(`{
|
||||
"type": "timeSeriesQuery",
|
||||
"namespace": "AWS/EC2",
|
||||
"metricName": "NetworkOut",
|
||||
"region": "us-east-1",
|
||||
"statistic": "Maximum",
|
||||
"period": "300"
|
||||
}`),
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)},
|
||||
JSON: json.RawMessage(`{
|
||||
"type": "timeSeriesQuery",
|
||||
"namespace": "AWS/EC2",
|
||||
"metricName": "NetworkIn",
|
||||
"region": "us-east-1",
|
||||
"statistic": "Maximum",
|
||||
"period": "300"
|
||||
}`),
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
mockSessionCache.AssertExpectations(t) // method is defined to only return "Once()",
|
||||
// AssertExpectations will fail if those methods were not called Once(), so expected number of calls is asserted by this line
|
||||
mockMetricClient.AssertNumberOfCalls(t, "GetMetricDataWithContext", 1)
|
||||
// GetMetricData is asserted to have been called 1 time for the 1 region present in the queries
|
||||
})
|
||||
|
||||
t.Run("3 queries with 2 regions calls GetSession 2 times and calls GetMetricDataWithContext 2 times", func(t *testing.T) {
|
||||
sessionCache := &mockSessionCache{}
|
||||
sessionCache.On("GetSession", mock.MatchedBy(
|
||||
func(config awsds.SessionConfig) bool { return config.Settings.Region == "us-east-1" })).
|
||||
Return(&session.Session{Config: &aws.Config{}}, nil, nil).Once()
|
||||
sessionCache.On("GetSession", mock.MatchedBy(
|
||||
func(config awsds.SessionConfig) bool { return config.Settings.Region == "us-east-2" })).
|
||||
Return(&session.Session{Config: &aws.Config{}}, nil, nil).Once()
|
||||
mockMetricClient = mocks.MetricsClient{}
|
||||
mockMetricClient.On("GetMetricDataWithContext", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||
|
||||
executor := newExecutor(im, newTestConfig(), sessionCache, featuremgmt.WithFeatures())
|
||||
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
|
||||
},
|
||||
Queries: []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)},
|
||||
JSON: json.RawMessage(`{
|
||||
"type": "timeSeriesQuery",
|
||||
"namespace": "AWS/EC2",
|
||||
"metricName": "NetworkOut",
|
||||
"region": "us-east-2",
|
||||
"statistic": "Maximum",
|
||||
"period": "300"
|
||||
}`),
|
||||
},
|
||||
{
|
||||
RefID: "A2",
|
||||
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)},
|
||||
JSON: json.RawMessage(`{
|
||||
"type": "timeSeriesQuery",
|
||||
"namespace": "AWS/EC2",
|
||||
"metricName": "NetworkOut",
|
||||
"region": "us-east-2",
|
||||
"statistic": "Maximum",
|
||||
"period": "300"
|
||||
}`),
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
TimeRange: backend.TimeRange{From: time.Now().Add(time.Hour * -2), To: time.Now().Add(time.Hour * -1)},
|
||||
JSON: json.RawMessage(`{
|
||||
"type": "timeSeriesQuery",
|
||||
"namespace": "AWS/EC2",
|
||||
"metricName": "NetworkIn",
|
||||
"region": "us-east-1",
|
||||
"statistic": "Maximum",
|
||||
"period": "300"
|
||||
}`),
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
sessionCache.AssertExpectations(t) // method is defined to only return "Once()" for each region.
|
||||
// AssertExpectations will fail if those methods were not called Once(), so expected number of calls is asserted by this line
|
||||
mockMetricClient.AssertNumberOfCalls(t, "GetMetricDataWithContext", 2)
|
||||
// GetMetricData is asserted to have been called 2 times, presumably once for each group of regions (2 regions total)
|
||||
})
|
||||
}
|
||||
|
||||
type queryDimensions struct {
|
||||
InstanceID []string `json:"InstanceId,omitempty"`
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
type fakeCWLogsClient struct {
|
||||
@ -190,6 +191,15 @@ func newTestConfig() *setting.Cfg {
|
||||
return &setting.Cfg{AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true, AWSListMetricsPageLimit: 1000}
|
||||
}
|
||||
|
||||
type mockSessionCache struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (c *mockSessionCache) GetSession(config awsds.SessionConfig) (*session.Session, error) {
|
||||
args := c.Called(config)
|
||||
return args.Get(0).(*session.Session), args.Error(1)
|
||||
}
|
||||
|
||||
type fakeSessionCache struct {
|
||||
getSession func(c awsds.SessionConfig) (*session.Session, error)
|
||||
calledRegions []string
|
||||
|
Loading…
Reference in New Issue
Block a user