Cloudwatch: ListMetrics API page limit (#31788)

* add list metrics api page limit

* Update docs/sources/datasources/cloudwatch.md

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
This commit is contained in:
Erik Sundell
2021-03-10 07:41:22 +01:00
committed by GitHub
parent 04e46e3853
commit d512c5a1b4
9 changed files with 136 additions and 63 deletions

View File

@@ -169,6 +169,7 @@ func TestQuery_GetLogGroupFields(t *testing.T) {
}
const refID = "A"
executor := newExecutor(nil, newTestConfig())
resp, err := executor.DataQuery(context.Background(), fakeDataSource(), plugins.DataQuery{
Queries: []plugins.DataSubQuery{

View File

@@ -462,14 +462,21 @@ func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param
}
}
metrics, err := e.cloudwatchListMetrics(region, namespace, metricName, dimensions)
params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(namespace),
Dimensions: dimensions,
}
if metricName != "" {
params.MetricName = aws.String(metricName)
}
metrics, err := e.listMetrics(region, params)
if err != nil {
return nil, err
}
result := make([]suggestData, 0)
dupCheck := make(map[string]bool)
for _, metric := range metrics.Metrics {
for _, metric := range metrics {
for _, dim := range metric.Dimensions {
if *dim.Name == dimensionKey {
if _, exists := dupCheck[*dim.Value]; exists {
@@ -631,36 +638,29 @@ func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete
return result, nil
}
func (e *cloudWatchExecutor) cloudwatchListMetrics(region string, namespace string, metricName string,
dimensions []*cloudwatch.DimensionFilter) (*cloudwatch.ListMetricsOutput, error) {
svc, err := e.getCWClient(region)
func (e *cloudWatchExecutor) listMetrics(region string, params *cloudwatch.ListMetricsInput) ([]*cloudwatch.Metric, error) {
client, err := e.getCWClient(region)
if err != nil {
return nil, err
}
params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(namespace),
Dimensions: dimensions,
}
plog.Debug("Listing metrics pages")
cloudWatchMetrics := []*cloudwatch.Metric{}
if metricName != "" {
params.MetricName = aws.String(metricName)
}
var resp cloudwatch.ListMetricsOutput
if err := svc.ListMetricsPages(params,
func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool {
metrics.MAwsCloudWatchListMetrics.Inc()
metrics, _ := awsutil.ValuesAtPath(page, "Metrics")
pageNum := 0
err = client.ListMetricsPages(params, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool {
pageNum++
metrics.MAwsCloudWatchListMetrics.Inc()
metrics, err := awsutil.ValuesAtPath(page, "Metrics")
if err == nil {
for _, metric := range metrics {
resp.Metrics = append(resp.Metrics, metric.(*cloudwatch.Metric))
cloudWatchMetrics = append(cloudWatchMetrics, metric.(*cloudwatch.Metric))
}
return !lastPage
}); err != nil {
return nil, fmt.Errorf("failed to call cloudwatch:ListMetrics: %w", err)
}
}
return !lastPage && pageNum < e.cfg.AWSListMetricsPageLimit
})
return &resp, nil
return cloudWatchMetrics, err
}
func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string) (*ec2.DescribeInstancesOutput, error) {
@@ -709,34 +709,6 @@ func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters [
return &resp, nil
}
func (e *cloudWatchExecutor) getAllMetrics(region, namespace string) (cloudwatch.ListMetricsOutput, error) {
client, err := e.getCWClient(region)
if err != nil {
return cloudwatch.ListMetricsOutput{}, err
}
params := &cloudwatch.ListMetricsInput{
Namespace: aws.String(namespace),
}
plog.Debug("Listing metrics pages")
var resp cloudwatch.ListMetricsOutput
err = client.ListMetricsPages(params, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool {
metrics.MAwsCloudWatchListMetrics.Inc()
metrics, err := awsutil.ValuesAtPath(page, "Metrics")
if err != nil {
return !lastPage
}
for _, metric := range metrics {
resp.Metrics = append(resp.Metrics, metric.(*cloudwatch.Metric))
}
return !lastPage
})
return resp, err
}
var metricsCacheLock sync.Mutex
func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string) ([]string, error) {
@@ -760,7 +732,10 @@ func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string
if customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire.After(time.Now()) {
return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil
}
result, err := e.getAllMetrics(region, namespace)
metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{
Namespace: aws.String(namespace),
})
if err != nil {
return []string{}, err
}
@@ -768,7 +743,7 @@ func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region, namespace string
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0)
customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire = time.Now().Add(5 * time.Minute)
for _, metric := range result.Metrics {
for _, metric := range metrics {
if isDuplicate(customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *metric.MetricName) {
continue
}
@@ -801,14 +776,15 @@ func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region, namespace str
if customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire.After(time.Now()) {
return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, nil
}
result, err := e.getAllMetrics(region, namespace)
metrics, err := e.listMetrics(region, &cloudwatch.ListMetricsInput{Namespace: aws.String(namespace)})
if err != nil {
return []string{}, err
}
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache = make([]string, 0)
customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Expire = time.Now().Add(5 * time.Minute)
for _, metric := range result.Metrics {
for _, metric := range metrics {
for _, dimension := range metric.Dimensions {
if isDuplicate(customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][namespace].Cache, *dimension.Name) {
continue

View File

@@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -499,3 +500,50 @@ func TestQuery_ResourceARNs(t *testing.T) {
}, resp)
})
}
func TestQuery_ListMetricsPagination(t *testing.T) {
origNewCWClient := NewCWClient
t.Cleanup(func() {
NewCWClient = origNewCWClient
})
var client FakeCWClient
NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
return client
}
metrics := []*cloudwatch.Metric{
{MetricName: aws.String("Test_MetricName1")},
{MetricName: aws.String("Test_MetricName2")},
{MetricName: aws.String("Test_MetricName3")},
{MetricName: aws.String("Test_MetricName4")},
{MetricName: aws.String("Test_MetricName5")},
{MetricName: aws.String("Test_MetricName6")},
{MetricName: aws.String("Test_MetricName7")},
{MetricName: aws.String("Test_MetricName8")},
{MetricName: aws.String("Test_MetricName9")},
{MetricName: aws.String("Test_MetricName10")},
}
t.Run("List Metrics and page limit is reached", func(t *testing.T) {
client = FakeCWClient{Metrics: metrics, MetricsPerPage: 2}
executor := newExecutor(nil, &setting.Cfg{AWSListMetricsPageLimit: 3, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true})
executor.DataSource = fakeDataSource()
response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{})
require.NoError(t, err)
expectedMetrics := client.MetricsPerPage * executor.cfg.AWSListMetricsPageLimit
assert.Equal(t, expectedMetrics, len(response))
})
t.Run("List Metrics and page limit is not reached", func(t *testing.T) {
client = FakeCWClient{Metrics: metrics, MetricsPerPage: 2}
executor := newExecutor(nil, &setting.Cfg{AWSListMetricsPageLimit: 1000, AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true})
executor.DataSource = fakeDataSource()
response, err := executor.listMetrics("default", &cloudwatch.ListMetricsInput{})
require.NoError(t, err)
assert.Equal(t, len(metrics), len(response))
})
}

View File

@@ -79,12 +79,24 @@ type FakeCWClient struct {
cloudwatchiface.CloudWatchAPI
Metrics []*cloudwatch.Metric
MetricsPerPage int
}
func (c FakeCWClient) ListMetricsPages(input *cloudwatch.ListMetricsInput, fn func(*cloudwatch.ListMetricsOutput, bool) bool) error {
fn(&cloudwatch.ListMetricsOutput{
Metrics: c.Metrics,
}, true)
if c.MetricsPerPage == 0 {
c.MetricsPerPage = 1000
}
chunks := chunkSlice(c.Metrics, c.MetricsPerPage)
for i, metrics := range chunks {
response := fn(&cloudwatch.ListMetricsOutput{
Metrics: metrics,
}, i+1 == len(chunks))
if !response {
break
}
}
return nil
}
@@ -147,6 +159,23 @@ func (c fakeRGTAClient) GetResourcesPages(in *resourcegroupstaggingapi.GetResour
return nil
}
func newTestConfig() *setting.Cfg {
return &setting.Cfg{AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true}
func chunkSlice(slice []*cloudwatch.Metric, chunkSize int) [][]*cloudwatch.Metric {
var chunks [][]*cloudwatch.Metric
for {
if len(slice) == 0 {
break
}
if len(slice) < chunkSize {
chunkSize = len(slice)
}
chunks = append(chunks, slice[0:chunkSize])
slice = slice[chunkSize:]
}
return chunks
}
func newTestConfig() *setting.Cfg {
return &setting.Cfg{AWSAllowedAuthProviders: []string{"default"}, AWSAssumeRoleEnabled: true, AWSListMetricsPageLimit: 1000}
}