From c863fd3da968142da3c0d54d0a09b45a6473acdf Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 14 Jul 2020 08:23:23 +0200 Subject: [PATCH] CloudWatch: Clean up code (#26259) * CloudWatch: Clean up code --- pkg/tsdb/cloudwatch/annotation_query.go | 10 +- pkg/tsdb/cloudwatch/cloudwatch.go | 156 +++++++++++++----- pkg/tsdb/cloudwatch/credentials.go | 113 ++----------- pkg/tsdb/cloudwatch/credentials_test.go | 4 +- .../cloudwatch/get_metric_data_executor.go | 2 +- .../get_metric_data_executor_test.go | 2 +- pkg/tsdb/cloudwatch/log_actions.go | 34 ++-- pkg/tsdb/cloudwatch/log_actions_test.go | 14 +- .../cloudwatch/metric_data_input_builder.go | 3 +- .../cloudwatch/metric_data_query_builder.go | 2 +- pkg/tsdb/cloudwatch/metric_find_query.go | 138 ++++++++-------- pkg/tsdb/cloudwatch/metric_find_query_test.go | 42 +++-- pkg/tsdb/cloudwatch/query_transformer.go | 4 +- pkg/tsdb/cloudwatch/query_transformer_test.go | 2 +- pkg/tsdb/cloudwatch/request_parser.go | 2 +- pkg/tsdb/cloudwatch/response_parser.go | 2 +- pkg/tsdb/cloudwatch/time_series_query.go | 4 +- pkg/tsdb/cloudwatch/time_series_query_test.go | 2 +- 18 files changed, 273 insertions(+), 263 deletions(-) diff --git a/pkg/tsdb/cloudwatch/annotation_query.go b/pkg/tsdb/cloudwatch/annotation_query.go index 2ef31055c08..6af828e0ee4 100644 --- a/pkg/tsdb/cloudwatch/annotation_query.go +++ b/pkg/tsdb/cloudwatch/annotation_query.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/grafana/pkg/util/errutil" ) -func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { +func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { result := &tsdb.Response{ Results: make(map[string]*tsdb.QueryResult), } @@ -36,7 +36,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo actionPrefix := parameters.Get("actionPrefix").MustString("") alarmNamePrefix := parameters.Get("alarmNamePrefix").MustString("") - svc, err := e.getClient(region) + cli, err := e.getCWClient(region) if err != nil { return nil, err } @@ -48,7 +48,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo ActionPrefix: aws.String(actionPrefix), AlarmNamePrefix: aws.String(alarmNamePrefix), } - resp, err := svc.DescribeAlarms(params) + resp, err := cli.DescribeAlarms(params) if err != nil { return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarms", err) } @@ -79,7 +79,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo Statistic: aws.String(s), Period: aws.Int64(period), } - resp, err := svc.DescribeAlarmsForMetric(params) + resp, err := cli.DescribeAlarmsForMetric(params) if err != nil { return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmsForMetric", err) } @@ -106,7 +106,7 @@ func (e *CloudWatchExecutor) executeAnnotationQuery(ctx context.Context, queryCo EndDate: aws.Time(endTime), MaxRecords: aws.Int64(100), } - resp, err := svc.DescribeAlarmHistory(params) + resp, err := cli.DescribeAlarmHistory(params) if err != nil { return nil, errutil.Wrap("failed to call cloudwatch:DescribeAlarmHistory", err) } diff --git a/pkg/tsdb/cloudwatch/cloudwatch.go b/pkg/tsdb/cloudwatch/cloudwatch.go index 92df32b3c08..2ce069c0fe5 100644 --- a/pkg/tsdb/cloudwatch/cloudwatch.go +++ b/pkg/tsdb/cloudwatch/cloudwatch.go @@ -9,25 +9,19 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/ec2/ec2iface" "github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/models" + "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb" ) -type CloudWatchExecutor struct { - *models.DataSource - ec2Svc ec2iface.EC2API - rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI - - logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs) - mux sync.Mutex -} - -type DatasourceInfo struct { +type datasourceInfo struct { Profile string Region string AuthType string @@ -40,12 +34,69 @@ type DatasourceInfo struct { } const cloudWatchTSFormat = "2006-01-02 15:04:05.000" +const defaultRegion = "default" // Constants also defined in datasource/cloudwatch/datasource.ts const logIdentifierInternal = "__log__grafana_internal__" const logStreamIdentifierInternal = "__logstream__grafana_internal__" -func (e *CloudWatchExecutor) getLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) { +var plog = log.New("tsdb.cloudwatch") +var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) + +func init() { + tsdb.RegisterTsdbQueryEndpoint("cloudwatch", newcloudWatchExecutor) +} + +func newcloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { + e := &cloudWatchExecutor{ + DataSource: datasource, + } + + dsInfo := e.getDSInfo(defaultRegion) + defaultLogsClient, err := retrieveLogsClient(dsInfo) + if err != nil { + return nil, err + } + e.logsClientsByRegion = map[string]*cloudwatchlogs.CloudWatchLogs{ + dsInfo.Region: defaultLogsClient, + defaultRegion: defaultLogsClient, + } + + return e, nil +} + +// cloudWatchExecutor executes CloudWatch requests. +type cloudWatchExecutor struct { + *models.DataSource + ec2Svc ec2iface.EC2API + rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI + + logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs) + mux sync.Mutex +} + +func (e *cloudWatchExecutor) getCWClient(region string) (*cloudwatch.CloudWatch, error) { + datasourceInfo := e.getDSInfo(region) + cfg, err := getAwsConfig(datasourceInfo) + if err != nil { + return nil, err + } + + sess, err := newSession(cfg) + if err != nil { + return nil, err + } + + client := cloudwatch.New(sess, cfg) + + client.Handlers.Send.PushFront(func(r *request.Request) { + r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) + }) + + return client, nil +} + +func (e *cloudWatchExecutor) getCWLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) { e.mux.Lock() defer e.mux.Unlock() @@ -53,9 +104,8 @@ func (e *CloudWatchExecutor) getLogsClient(region string) (*cloudwatchlogs.Cloud return logsClient, nil } - dsInfo := retrieveDsInfo(e.DataSource, region) + dsInfo := e.getDSInfo(region) newLogsClient, err := retrieveLogsClient(dsInfo) - if err != nil { return nil, err } @@ -65,31 +115,7 @@ func (e *CloudWatchExecutor) getLogsClient(region string) (*cloudwatchlogs.Cloud return newLogsClient, nil } -func NewCloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) { - dsInfo := retrieveDsInfo(datasource, "default") - defaultLogsClient, err := retrieveLogsClient(dsInfo) - - if err != nil { - return nil, err - } - - logsClientsByRegion := make(map[string](*cloudwatchlogs.CloudWatchLogs)) - logsClientsByRegion[dsInfo.Region] = defaultLogsClient - logsClientsByRegion["default"] = defaultLogsClient - - return &CloudWatchExecutor{ - logsClientsByRegion: logsClientsByRegion, - }, nil -} - -var plog = log.New("tsdb.cloudwatch") -var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) - -func init() { - tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor) -} - -func (e *CloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwatchlogs.CloudWatchLogs, queryContext *tsdb.TsdbQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) { +func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwatchlogs.CloudWatchLogs, queryContext *tsdb.TsdbQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) { const maxAttempts = 8 const pollPeriod = 1000 * time.Millisecond @@ -126,7 +152,8 @@ func (e *CloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwa return nil, nil } -func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { +// Query executes a CloudWatch query. +func (e *cloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { e.DataSource = dsInfo /* @@ -163,18 +190,18 @@ func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc return result, err } -func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { +func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { queryParams := queryContext.Queries[0].Model queryParams.Set("subtype", "StartQuery") queryParams.Set("queryString", queryParams.Get("expression").MustString("")) - region := queryParams.Get("region").MustString("default") - if region == "default" { + region := queryParams.Get("region").MustString(defaultRegion) + if region == defaultRegion { region = e.DataSource.JsonData.Get("defaultRegion").MustString() queryParams.Set("region", region) } - logsClient, err := e.getLogsClient(region) + logsClient, err := e.getCWLogsClient(region) if err != nil { return nil, err } @@ -227,6 +254,49 @@ func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryCont return response, nil } +func (e *cloudWatchExecutor) getDSInfo(region string) *datasourceInfo { + if region == defaultRegion { + region = e.DataSource.JsonData.Get("defaultRegion").MustString() + } + + authType := e.DataSource.JsonData.Get("authType").MustString() + assumeRoleArn := e.DataSource.JsonData.Get("assumeRoleArn").MustString() + externalID := e.DataSource.JsonData.Get("externalId").MustString() + decrypted := e.DataSource.DecryptedValues() + accessKey := decrypted["accessKey"] + secretKey := decrypted["secretKey"] + + return &datasourceInfo{ + Region: region, + Profile: e.DataSource.Database, + AuthType: authType, + AssumeRoleArn: assumeRoleArn, + ExternalID: externalID, + AccessKey: accessKey, + SecretKey: secretKey, + } +} + +func retrieveLogsClient(dsInfo *datasourceInfo) (*cloudwatchlogs.CloudWatchLogs, error) { + cfg, err := getAwsConfig(dsInfo) + if err != nil { + return nil, err + } + + sess, err := newSession(cfg) + if err != nil { + return nil, err + } + + client := cloudwatchlogs.New(sess, cfg) + + client.Handlers.Send.PushFront(func(r *request.Request) { + r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) + }) + + return client, nil +} + func isTerminated(queryStatus string) bool { return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout" } diff --git a/pkg/tsdb/cloudwatch/credentials.go b/pkg/tsdb/cloudwatch/credentials.go index d66fa4e9e7f..512e9827744 100644 --- a/pkg/tsdb/cloudwatch/credentials.go +++ b/pkg/tsdb/cloudwatch/credentials.go @@ -14,23 +14,18 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/defaults" "github.com/aws/aws-sdk-go/aws/ec2metadata" - "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/sts" "github.com/aws/aws-sdk-go/service/sts/stsiface" - "github.com/grafana/grafana/pkg/models" - "github.com/grafana/grafana/pkg/setting" ) -type cache struct { - credential *credentials.Credentials - expiration *time.Time +type envelope struct { + credentials *credentials.Credentials + expiration *time.Time } -var awsCredentialCache = make(map[string]cache) -var credentialCacheLock sync.RWMutex +var awsCredsCache = map[string]envelope{} +var credsCacheLock sync.RWMutex // Session factory. // Stubbable by tests. @@ -50,18 +45,17 @@ var newEC2Metadata = func(p client.ConfigProvider, cfgs ...*aws.Config) *ec2meta return ec2metadata.New(p, cfgs...) } -func getCredentials(dsInfo *DatasourceInfo) (*credentials.Credentials, error) { +func getCredentials(dsInfo *datasourceInfo) (*credentials.Credentials, error) { cacheKey := fmt.Sprintf("%s:%s:%s:%s", dsInfo.AuthType, dsInfo.AccessKey, dsInfo.Profile, dsInfo.AssumeRoleArn) - credentialCacheLock.RLock() - if _, ok := awsCredentialCache[cacheKey]; ok { - if awsCredentialCache[cacheKey].expiration != nil && - awsCredentialCache[cacheKey].expiration.After(time.Now().UTC()) { - result := awsCredentialCache[cacheKey].credential - credentialCacheLock.RUnlock() + credsCacheLock.RLock() + if env, ok := awsCredsCache[cacheKey]; ok { + if env.expiration != nil && env.expiration.After(time.Now().UTC()) { + result := env.credentials + credsCacheLock.RUnlock() return result, nil } } - credentialCacheLock.RUnlock() + credsCacheLock.RUnlock() accessKeyID := "" secretAccessKey := "" @@ -135,12 +129,12 @@ func getCredentials(dsInfo *DatasourceInfo) (*credentials.Credentials, error) { remoteCredProvider(sess), }) - credentialCacheLock.Lock() - awsCredentialCache[cacheKey] = cache{ - credential: creds, - expiration: expiration, + credsCacheLock.Lock() + awsCredsCache[cacheKey] = envelope{ + credentials: creds, + expiration: expiration, } - credentialCacheLock.Unlock() + credsCacheLock.Unlock() return creds, nil } @@ -178,37 +172,7 @@ func ec2RoleProvider(sess *session.Session) credentials.Provider { return &ec2rolecreds.EC2RoleProvider{Client: newEC2Metadata(sess), ExpiryWindow: 5 * time.Minute} } -func (e *CloudWatchExecutor) getDsInfo(region string) *DatasourceInfo { - return retrieveDsInfo(e.DataSource, region) -} - -func retrieveDsInfo(datasource *models.DataSource, region string) *DatasourceInfo { - defaultRegion := datasource.JsonData.Get("defaultRegion").MustString() - if region == "default" { - region = defaultRegion - } - - authType := datasource.JsonData.Get("authType").MustString() - assumeRoleArn := datasource.JsonData.Get("assumeRoleArn").MustString() - externalID := datasource.JsonData.Get("externalId").MustString() - decrypted := datasource.DecryptedValues() - accessKey := decrypted["accessKey"] - secretKey := decrypted["secretKey"] - - datasourceInfo := &DatasourceInfo{ - Region: region, - Profile: datasource.Database, - AuthType: authType, - AssumeRoleArn: assumeRoleArn, - ExternalID: externalID, - AccessKey: accessKey, - SecretKey: secretKey, - } - - return datasourceInfo -} - -func getAwsConfig(dsInfo *DatasourceInfo) (*aws.Config, error) { +func getAwsConfig(dsInfo *datasourceInfo) (*aws.Config, error) { creds, err := getCredentials(dsInfo) if err != nil { return nil, err @@ -221,44 +185,3 @@ func getAwsConfig(dsInfo *DatasourceInfo) (*aws.Config, error) { return cfg, nil } - -func (e *CloudWatchExecutor) getClient(region string) (*cloudwatch.CloudWatch, error) { - datasourceInfo := e.getDsInfo(region) - cfg, err := getAwsConfig(datasourceInfo) - if err != nil { - return nil, err - } - - sess, err := newSession(cfg) - if err != nil { - return nil, err - } - - client := cloudwatch.New(sess, cfg) - - client.Handlers.Send.PushFront(func(r *request.Request) { - r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) - }) - - return client, nil -} - -func retrieveLogsClient(datasourceInfo *DatasourceInfo) (*cloudwatchlogs.CloudWatchLogs, error) { - cfg, err := getAwsConfig(datasourceInfo) - if err != nil { - return nil, err - } - - sess, err := newSession(cfg) - if err != nil { - return nil, err - } - - client := cloudwatchlogs.New(sess, cfg) - - client.Handlers.Send.PushFront(func(r *request.Request) { - r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion)) - }) - - return client, nil -} diff --git a/pkg/tsdb/cloudwatch/credentials_test.go b/pkg/tsdb/cloudwatch/credentials_test.go index e3d5f200ba7..785e8e035e3 100644 --- a/pkg/tsdb/cloudwatch/credentials_test.go +++ b/pkg/tsdb/cloudwatch/credentials_test.go @@ -87,7 +87,7 @@ func TestGetCredentials_ARNAuthType(t *testing.T) { }, nil). Times(1) - creds, err := getCredentials(&DatasourceInfo{ + creds, err := getCredentials(&datasourceInfo{ AuthType: "arn", }) require.NoError(t, err) @@ -113,7 +113,7 @@ func TestGetCredentials_ARNAuthType(t *testing.T) { }, nil). Times(1) - creds, err := getCredentials(&DatasourceInfo{ + creds, err := getCredentials(&datasourceInfo{ AuthType: "arn", ExternalID: "external-id", }) diff --git a/pkg/tsdb/cloudwatch/get_metric_data_executor.go b/pkg/tsdb/cloudwatch/get_metric_data_executor.go index d8df196c560..4ca62c46e76 100644 --- a/pkg/tsdb/cloudwatch/get_metric_data_executor.go +++ b/pkg/tsdb/cloudwatch/get_metric_data_executor.go @@ -8,7 +8,7 @@ import ( "github.com/grafana/grafana/pkg/infra/metrics" ) -func (e *CloudWatchExecutor) executeRequest(ctx context.Context, client cloudWatchClient, metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) { +func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudWatchClient, metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) { mdo := make([]*cloudwatch.GetMetricDataOutput, 0) nextToken := "" diff --git a/pkg/tsdb/cloudwatch/get_metric_data_executor_test.go b/pkg/tsdb/cloudwatch/get_metric_data_executor_test.go index f1fddfbbeb9..090281dc334 100644 --- a/pkg/tsdb/cloudwatch/get_metric_data_executor_test.go +++ b/pkg/tsdb/cloudwatch/get_metric_data_executor_test.go @@ -35,7 +35,7 @@ func (client *cloudWatchFakeClient) GetMetricDataWithContext(ctx aws.Context, in } func TestGetMetricDataExecutorTest(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} inputs := &cloudwatch.GetMetricDataInput{MetricDataQueries: []*cloudwatch.MetricDataQuery{}} res, err := executor.executeRequest(context.Background(), &cloudWatchFakeClient{}, inputs) require.NoError(t, err) diff --git a/pkg/tsdb/cloudwatch/log_actions.go b/pkg/tsdb/cloudwatch/log_actions.go index e471a17d0a7..44b7a16c850 100644 --- a/pkg/tsdb/cloudwatch/log_actions.go +++ b/pkg/tsdb/cloudwatch/log_actions.go @@ -15,7 +15,7 @@ import ( "golang.org/x/sync/errgroup" ) -func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { +func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries)) eg, ectx := errgroup.WithContext(ctx) @@ -73,13 +73,13 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext return response, nil } -func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) { +func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) { parameters := query.Model subType := query.Model.Get("subtype").MustString() defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString() region := parameters.Get("region").MustString(defaultRegion) - logsClient, err := e.getLogsClient(region) + logsClient, err := e.getCWLogsClient(region) if err != nil { return nil, err } @@ -100,7 +100,6 @@ func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext case "GetLogEvents": data, err = e.handleGetLogEvents(ctx, logsClient, parameters) } - if err != nil { return nil, err } @@ -108,7 +107,8 @@ func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext return data, nil } -func (e *CloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { +func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json) (*data.Frame, error) { queryRequest := &cloudwatchlogs.GetLogEventsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(10)), StartFromHead: aws.Bool(parameters.Get("startFromHead").MustBool(false)), @@ -159,7 +159,8 @@ func (e *CloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient return data.NewFrame("logEvents", timestampField, messageField), nil } -func (e *CloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { +func (e *cloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, + logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { logGroupNamePrefix := parameters.Get("logGroupNamePrefix").MustString("") var response *cloudwatchlogs.DescribeLogGroupsOutput = nil @@ -189,7 +190,8 @@ func (e *CloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, logsCl return frame, nil } -func (e *CloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange *tsdb.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) { +func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json, timeRange *tsdb.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) { startTime, err := timeRange.ParseFrom() if err != nil { return nil, err @@ -224,7 +226,8 @@ func (e *CloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c return logsClient.StartQueryWithContext(ctx, startQueryInput) } -func (e *CloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange *tsdb.TimeRange, refID string) (*data.Frame, error) { +func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json, timeRange *tsdb.TimeRange, refID string) (*data.Frame, error) { startQueryResponse, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange) if err != nil { return nil, err @@ -244,7 +247,8 @@ func (e *CloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl return dataFrame, nil } -func (e *CloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.StopQueryOutput, error) { +func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json) (*cloudwatchlogs.StopQueryOutput, error) { queryInput := &cloudwatchlogs.StopQueryInput{ QueryId: aws.String(parameters.Get("queryId").MustString()), } @@ -264,7 +268,8 @@ func (e *CloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cl return response, err } -func (e *CloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { +func (e *cloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json) (*data.Frame, error) { response, err := e.executeStopQuery(ctx, logsClient, parameters) if err != nil { return nil, err @@ -274,7 +279,8 @@ func (e *CloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient clo return dataFrame, nil } -func (e *CloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) { +func (e *cloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) { queryInput := &cloudwatchlogs.GetQueryResultsInput{ QueryId: aws.String(parameters.Get("queryId").MustString()), } @@ -282,7 +288,8 @@ func (e *CloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsCli return logsClient.GetQueryResultsWithContext(ctx, queryInput) } -func (e *CloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) { +func (e *cloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json, refID string) (*data.Frame, error) { getQueryResultsOutput, err := e.executeGetQueryResults(ctx, logsClient, parameters) if err != nil { return nil, err @@ -299,7 +306,8 @@ func (e *CloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClie return dataFrame, nil } -func (e *CloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) { +func (e *cloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, + parameters *simplejson.Json, refID string) (*data.Frame, error) { queryInput := &cloudwatchlogs.GetLogGroupFieldsInput{ LogGroupName: aws.String(parameters.Get("logGroupName").MustString()), Time: aws.Int64(parameters.Get("time").MustInt64()), diff --git a/pkg/tsdb/cloudwatch/log_actions_test.go b/pkg/tsdb/cloudwatch/log_actions_test.go index aae7363026c..4cf0b16bf14 100644 --- a/pkg/tsdb/cloudwatch/log_actions_test.go +++ b/pkg/tsdb/cloudwatch/log_actions_test.go @@ -21,7 +21,7 @@ import ( //*** func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsEmpty(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ @@ -43,7 +43,7 @@ func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsEmpty(t *testing.T) { } func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ @@ -64,7 +64,7 @@ func TestHandleDescribeLogGroups_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T) } func TestHandleGetLogGroupFields_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ @@ -89,7 +89,7 @@ func TestHandleGetLogGroupFields_WhenLogGroupNamePrefixIsNotEmpty(t *testing.T) } func TestExecuteStartQuery(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ @@ -117,7 +117,7 @@ func TestExecuteStartQuery(t *testing.T) { } func TestHandleStartQuery(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ @@ -152,7 +152,7 @@ func TestHandleStartQuery(t *testing.T) { } func TestHandleStopQuery(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ @@ -174,7 +174,7 @@ func TestHandleStopQuery(t *testing.T) { } func TestHandleGetQueryResults(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} logsClient := &FakeLogsClient{ Config: aws.Config{ diff --git a/pkg/tsdb/cloudwatch/metric_data_input_builder.go b/pkg/tsdb/cloudwatch/metric_data_input_builder.go index d8d5d9add90..5b5727b6106 100644 --- a/pkg/tsdb/cloudwatch/metric_data_input_builder.go +++ b/pkg/tsdb/cloudwatch/metric_data_input_builder.go @@ -7,7 +7,8 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch" ) -func (e *CloudWatchExecutor) buildMetricDataInput(startTime time.Time, endTime time.Time, queries map[string]*cloudWatchQuery) (*cloudwatch.GetMetricDataInput, error) { +func (e *cloudWatchExecutor) buildMetricDataInput(startTime time.Time, endTime time.Time, + queries map[string]*cloudWatchQuery) (*cloudwatch.GetMetricDataInput, error) { metricDataInput := &cloudwatch.GetMetricDataInput{ StartTime: aws.Time(startTime), EndTime: aws.Time(endTime), diff --git a/pkg/tsdb/cloudwatch/metric_data_query_builder.go b/pkg/tsdb/cloudwatch/metric_data_query_builder.go index f6eec606a58..4d3c1db45ad 100644 --- a/pkg/tsdb/cloudwatch/metric_data_query_builder.go +++ b/pkg/tsdb/cloudwatch/metric_data_query_builder.go @@ -10,7 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch" ) -func (e *CloudWatchExecutor) buildMetricDataQuery(query *cloudWatchQuery) (*cloudwatch.MetricDataQuery, error) { +func (e *cloudWatchExecutor) buildMetricDataQuery(query *cloudWatchQuery) (*cloudwatch.MetricDataQuery, error) { mdq := &cloudwatch.MetricDataQuery{ Id: aws.String(query.Id), ReturnData: aws.Bool(query.ReturnData), diff --git a/pkg/tsdb/cloudwatch/metric_find_query.go b/pkg/tsdb/cloudwatch/metric_find_query.go index 29456d0f485..180edaf78dc 100644 --- a/pkg/tsdb/cloudwatch/metric_find_query.go +++ b/pkg/tsdb/cloudwatch/metric_find_query.go @@ -21,18 +21,26 @@ import ( "github.com/grafana/grafana/pkg/tsdb" ) +// Known AWS regions. +var knownRegions = []string{ + "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-southeast-1", + "ap-southeast-2", "ca-central-1", "cn-north-1", "cn-northwest-1", "eu-central-1", "eu-north-1", "eu-west-1", + "eu-west-2", "eu-west-3", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-gov-east-1", "us-gov-west-1", + "us-iso-east-1", "us-isob-east-1", "us-west-1", "us-west-2", +} + type suggestData struct { Text string Value string } -type CustomMetricsCache struct { +type customMetricsCache struct { Expire time.Time Cache []string } -var customMetricsMetricsMap = make(map[string]map[string]map[string]*CustomMetricsCache) -var customMetricsDimensionsMap = make(map[string]map[string]map[string]*CustomMetricsCache) +var customMetricsMetricsMap = make(map[string]map[string]map[string]*customMetricsCache) +var customMetricsDimensionsMap = make(map[string]map[string]map[string]*customMetricsCache) var metricsMap = map[string][]string{ "AWS/ACMPrivateCA": {"CRLGenerated", "Failure", "MisconfiguredCRLBucket", "Success", "Time"}, "AWS/AmazonMQ": {"ConsumerCount", "CpuCreditBalance", "CpuUtilization", "CurrentConnectionsCount", "DequeueCount", "DispatchCount", "EnqueueCount", "EnqueueTime", "ExpiredCount", "HeapUsage", "InflightCount", "JournalFilesForFastRecovery", "JournalFilesForFullRecovery", "MemoryUsage", "NetworkIn", "NetworkOut", "OpenTransactionsCount", "ProducerCount", "QueueSize", "StorePercentUsage", "TotalConsumerCount", "TotalMessageCount", "TotalProducerCount"}, @@ -222,7 +230,7 @@ var dimensionsMap = map[string][]string{ var regionCache sync.Map -func (e *CloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { +func (e *cloudWatchExecutor) executeMetricFindQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { firstQuery := queryContext.Queries[0] parameters := firstQuery.Model @@ -301,8 +309,9 @@ func parseMultiSelectValue(input string) []string { // Whenever this list is updated, the frontend list should also be updated. // Please update the region list in public/app/plugins/datasource/cloudwatch/partials/config.html -func (e *CloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { - dsInfo := e.getDsInfo("default") +func (e *cloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { + const region = "default" + dsInfo := e.getDSInfo(region) profile := dsInfo.Profile if cache, ok := regionCache.Load(profile); ok { if cache2, ok2 := cache.([]suggestData); ok2 { @@ -310,15 +319,11 @@ func (e *CloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s } } - regions := []string{ - "ap-east-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-south-1", "ap-southeast-1", "ap-southeast-2", "ca-central-1", - "eu-central-1", "eu-north-1", "eu-west-1", "eu-west-2", "eu-west-3", "me-south-1", "sa-east-1", "us-east-1", "us-east-2", "us-west-1", "us-west-2", - "cn-north-1", "cn-northwest-1", "us-gov-east-1", "us-gov-west-1", "us-isob-east-1", "us-iso-east-1", - } err := e.ensureClientSession("default") if err != nil { return nil, err } + regions := knownRegions r, err := e.ec2Svc.DescribeRegions(&ec2.DescribeRegionsInput{}) if err != nil { // ignore error for backward compatibility @@ -350,7 +355,7 @@ func (e *CloudWatchExecutor) handleGetRegions(ctx context.Context, parameters *s return result, nil } -func (e *CloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { keys := []string{} for key := range metricsMap { keys = append(keys, key) @@ -369,7 +374,7 @@ func (e *CloudWatchExecutor) handleGetNamespaces(ctx context.Context, parameters return result, nil } -func (e *CloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { region := parameters.Get("region").MustString() namespace := parameters.Get("namespace").MustString() @@ -381,10 +386,10 @@ func (e *CloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s } } else { var err error - dsInfo := e.getDsInfo(region) + dsInfo := e.getDSInfo(region) dsInfo.Namespace = namespace - if namespaceMetrics, err = getMetricsForCustomMetrics(dsInfo, getAllMetrics); err != nil { + if namespaceMetrics, err = e.getMetricsForCustomMetrics(region, e.getAllMetrics); err != nil { return nil, errors.New("Unable to call AWS API") } } @@ -398,7 +403,7 @@ func (e *CloudWatchExecutor) handleGetMetrics(ctx context.Context, parameters *s return result, nil } -func (e *CloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { region := parameters.Get("region").MustString() namespace := parameters.Get("namespace").MustString() @@ -410,10 +415,10 @@ func (e *CloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters } } else { var err error - dsInfo := e.getDsInfo(region) + dsInfo := e.getDSInfo(region) dsInfo.Namespace = namespace - if dimensionValues, err = getDimensionsForCustomMetrics(dsInfo, getAllMetrics); err != nil { + if dimensionValues, err = e.getDimensionsForCustomMetrics(region, e.getAllMetrics); err != nil { return nil, errors.New("Unable to call AWS API") } } @@ -427,7 +432,7 @@ func (e *CloudWatchExecutor) handleGetDimensions(ctx context.Context, parameters return result, nil } -func (e *CloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetDimensionValues(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { region := parameters.Get("region").MustString() namespace := parameters.Get("namespace").MustString() metricName := parameters.Get("metricName").MustString() @@ -478,9 +483,9 @@ func (e *CloudWatchExecutor) handleGetDimensionValues(ctx context.Context, param return result, nil } -func (e *CloudWatchExecutor) ensureClientSession(region string) error { +func (e *cloudWatchExecutor) ensureClientSession(region string) error { if e.ec2Svc == nil { - dsInfo := e.getDsInfo(region) + dsInfo := e.getDSInfo(region) cfg, err := getAwsConfig(dsInfo) if err != nil { return fmt.Errorf("Failed to call ec2:getAwsConfig, %w", err) @@ -494,7 +499,7 @@ func (e *CloudWatchExecutor) ensureClientSession(region string) error { return nil } -func (e *CloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { region := parameters.Get("region").MustString() instanceId := parameters.Get("instanceId").MustString() @@ -521,7 +526,7 @@ func (e *CloudWatchExecutor) handleGetEbsVolumeIds(ctx context.Context, paramete return result, nil } -func (e *CloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { region := parameters.Get("region").MustString() attributeName := parameters.Get("attributeName").MustString() filterJson := parameters.Get("filters").MustMap() @@ -529,15 +534,15 @@ func (e *CloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, var filters []*ec2.Filter for k, v := range filterJson { if vv, ok := v.([]interface{}); ok { - var vvvvv []*string + var values []*string for _, vvv := range vv { if vvvv, ok := vvv.(string); ok { - vvvvv = append(vvvvv, &vvvv) + values = append(values, &vvvv) } } filters = append(filters, &ec2.Filter{ Name: aws.String(k), - Values: vvvvv, + Values: values, }) } } @@ -605,9 +610,9 @@ func (e *CloudWatchExecutor) handleGetEc2InstanceAttribute(ctx context.Context, return result, nil } -func (e *CloudWatchExecutor) ensureRGTAClientSession(region string) error { +func (e *cloudWatchExecutor) ensureRGTAClientSession(region string) error { if e.rgtaSvc == nil { - dsInfo := e.getDsInfo(region) + dsInfo := e.getDSInfo(region) cfg, err := getAwsConfig(dsInfo) if err != nil { return fmt.Errorf("Failed to call ec2:getAwsConfig, %w", err) @@ -621,7 +626,7 @@ func (e *CloudWatchExecutor) ensureRGTAClientSession(region string) error { return nil } -func (e *CloudWatchExecutor) handleGetResourceArns(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { +func (e *cloudWatchExecutor) handleGetResourceArns(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) ([]suggestData, error) { region := parameters.Get("region").MustString() resourceType := parameters.Get("resourceType").MustString() filterJson := parameters.Get("tags").MustMap() @@ -634,15 +639,15 @@ func (e *CloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete var filters []*resourcegroupstaggingapi.TagFilter for k, v := range filterJson { if vv, ok := v.([]interface{}); ok { - var vvvvv []*string + var values []*string for _, vvv := range vv { if vvvv, ok := vvv.(string); ok { - vvvvv = append(vvvvv, &vvvv) + values = append(values, &vvvv) } } filters = append(filters, &resourcegroupstaggingapi.TagFilter{ Key: aws.String(k), - Values: vvvvv, + Values: values, }) } } @@ -664,8 +669,8 @@ func (e *CloudWatchExecutor) handleGetResourceArns(ctx context.Context, paramete return result, nil } -func (e *CloudWatchExecutor) cloudwatchListMetrics(region string, namespace string, metricName string, dimensions []*cloudwatch.DimensionFilter) (*cloudwatch.ListMetricsOutput, error) { - svc, err := e.getClient(region) +func (e *cloudWatchExecutor) cloudwatchListMetrics(region string, namespace string, metricName string, dimensions []*cloudwatch.DimensionFilter) (*cloudwatch.ListMetricsOutput, error) { + svc, err := e.getCWClient(region) if err != nil { return nil, err } @@ -695,57 +700,50 @@ func (e *CloudWatchExecutor) cloudwatchListMetrics(region string, namespace stri return &resp, nil } -func (e *CloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string) (*ec2.DescribeInstancesOutput, error) { +func (e *cloudWatchExecutor) ec2DescribeInstances(region string, filters []*ec2.Filter, instanceIds []*string) (*ec2.DescribeInstancesOutput, error) { params := &ec2.DescribeInstancesInput{ Filters: filters, InstanceIds: instanceIds, } var resp ec2.DescribeInstancesOutput - err := e.ec2Svc.DescribeInstancesPages(params, - func(page *ec2.DescribeInstancesOutput, lastPage bool) bool { - reservations, _ := awsutil.ValuesAtPath(page, "Reservations") - for _, reservation := range reservations { - resp.Reservations = append(resp.Reservations, reservation.(*ec2.Reservation)) - } - return !lastPage - }) - if err != nil { - return nil, fmt.Errorf("Failed to call ec2:DescribeInstances, %w", err) + if err := e.ec2Svc.DescribeInstancesPages(params, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool { + resp.Reservations = append(resp.Reservations, page.Reservations...) + return !lastPage + }); err != nil { + return nil, fmt.Errorf("failed to call ec2:DescribeInstances, %w", err) } return &resp, nil } -func (e *CloudWatchExecutor) resourceGroupsGetResources(region string, filters []*resourcegroupstaggingapi.TagFilter, resourceTypes []*string) (*resourcegroupstaggingapi.GetResourcesOutput, error) { +func (e *cloudWatchExecutor) resourceGroupsGetResources(region string, filters []*resourcegroupstaggingapi.TagFilter, + resourceTypes []*string) (*resourcegroupstaggingapi.GetResourcesOutput, error) { params := &resourcegroupstaggingapi.GetResourcesInput{ ResourceTypeFilters: resourceTypes, TagFilters: filters, } var resp resourcegroupstaggingapi.GetResourcesOutput - err := e.rgtaSvc.GetResourcesPages(params, + if err := e.rgtaSvc.GetResourcesPages(params, func(page *resourcegroupstaggingapi.GetResourcesOutput, lastPage bool) bool { - resources, _ := awsutil.ValuesAtPath(page, "ResourceTagMappingList") - for _, resource := range resources { - resp.ResourceTagMappingList = append(resp.ResourceTagMappingList, resource.(*resourcegroupstaggingapi.ResourceTagMapping)) - } + resp.ResourceTagMappingList = append(resp.ResourceTagMappingList, page.ResourceTagMappingList...) return !lastPage - }) - if err != nil { - return nil, fmt.Errorf("Failed to call tags:GetResources, %w", err) + }); err != nil { + return nil, fmt.Errorf("failed to call tags:GetResources, %w", err) } return &resp, nil } -func getAllMetrics(cwData *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) { - creds, err := getCredentials(cwData) +func (e *cloudWatchExecutor) getAllMetrics(region string) (cloudwatch.ListMetricsOutput, error) { + dsInfo := e.getDSInfo(region) + creds, err := getCredentials(dsInfo) if err != nil { return cloudwatch.ListMetricsOutput{}, err } cfg := &aws.Config{ - Region: aws.String(cwData.Region), + Region: aws.String(dsInfo.Region), Credentials: creds, } sess, err := session.NewSession(cfg) @@ -755,7 +753,7 @@ func getAllMetrics(cwData *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) svc := cloudwatch.New(sess, cfg) params := &cloudwatch.ListMetricsInput{ - Namespace: aws.String(cwData.Namespace), + Namespace: aws.String(dsInfo.Namespace), } var resp cloudwatch.ListMetricsOutput @@ -776,25 +774,27 @@ func getAllMetrics(cwData *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) var metricsCacheLock sync.Mutex -func getMetricsForCustomMetrics(dsInfo *DatasourceInfo, getAllMetrics func(*DatasourceInfo) (cloudwatch.ListMetricsOutput, error)) ([]string, error) { +func (e *cloudWatchExecutor) getMetricsForCustomMetrics(region string, getAllMetrics func(string) (cloudwatch.ListMetricsOutput, error)) ([]string, error) { metricsCacheLock.Lock() defer metricsCacheLock.Unlock() + dsInfo := e.getDSInfo(region) + if _, ok := customMetricsMetricsMap[dsInfo.Profile]; !ok { - customMetricsMetricsMap[dsInfo.Profile] = make(map[string]map[string]*CustomMetricsCache) + customMetricsMetricsMap[dsInfo.Profile] = make(map[string]map[string]*customMetricsCache) } if _, ok := customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region]; !ok { - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*CustomMetricsCache) + customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*customMetricsCache) } if _, ok := customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace]; !ok { - customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace] = &CustomMetricsCache{} + customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace] = &customMetricsCache{} customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache = make([]string, 0) } if customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Expire.After(time.Now()) { return customMetricsMetricsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache, nil } - result, err := getAllMetrics(dsInfo) + result, err := getAllMetrics(region) if err != nil { return []string{}, err } @@ -813,25 +813,27 @@ func getMetricsForCustomMetrics(dsInfo *DatasourceInfo, getAllMetrics func(*Data var dimensionsCacheLock sync.Mutex -func getDimensionsForCustomMetrics(dsInfo *DatasourceInfo, getAllMetrics func(*DatasourceInfo) (cloudwatch.ListMetricsOutput, error)) ([]string, error) { +func (e *cloudWatchExecutor) getDimensionsForCustomMetrics(region string, getAllMetrics func(string) (cloudwatch.ListMetricsOutput, error)) ([]string, error) { dimensionsCacheLock.Lock() defer dimensionsCacheLock.Unlock() + dsInfo := e.getDSInfo(region) + if _, ok := customMetricsDimensionsMap[dsInfo.Profile]; !ok { - customMetricsDimensionsMap[dsInfo.Profile] = make(map[string]map[string]*CustomMetricsCache) + customMetricsDimensionsMap[dsInfo.Profile] = make(map[string]map[string]*customMetricsCache) } if _, ok := customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region]; !ok { - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*CustomMetricsCache) + customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region] = make(map[string]*customMetricsCache) } if _, ok := customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace]; !ok { - customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace] = &CustomMetricsCache{} + customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace] = &customMetricsCache{} customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache = make([]string, 0) } if customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Expire.After(time.Now()) { return customMetricsDimensionsMap[dsInfo.Profile][dsInfo.Region][dsInfo.Namespace].Cache, nil } - result, err := getAllMetrics(dsInfo) + result, err := getAllMetrics(region) if err != nil { return []string{}, err } diff --git a/pkg/tsdb/cloudwatch/metric_find_query_test.go b/pkg/tsdb/cloudwatch/metric_find_query_test.go index ebcf05b8a01..3fadc217124 100644 --- a/pkg/tsdb/cloudwatch/metric_find_query_test.go +++ b/pkg/tsdb/cloudwatch/metric_find_query_test.go @@ -44,13 +44,16 @@ func (m mockedRGTA) GetResourcesPages(in *resourcegroupstaggingapi.GetResourcesI func TestCloudWatchMetrics(t *testing.T) { t.Run("When calling getMetricsForCustomMetrics", func(t *testing.T) { - dsInfo := &DatasourceInfo{ - Region: "us-east-1", - Namespace: "Foo", - Profile: "default", - AssumeRoleArn: "", + const region = "us-east-1" + e := &cloudWatchExecutor{ + DataSource: &models.DataSource{ + Database: "default", + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "Region": region, + }), + }, } - f := func(dsInfo *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) { + f := func(region string) (cloudwatch.ListMetricsOutput, error) { return cloudwatch.ListMetricsOutput{ Metrics: []*cloudwatch.Metric{ { @@ -64,20 +67,23 @@ func TestCloudWatchMetrics(t *testing.T) { }, }, nil } - metrics, err := getMetricsForCustomMetrics(dsInfo, f) + metrics, err := e.getMetricsForCustomMetrics(region, f) require.NoError(t, err) assert.Contains(t, metrics, "Test_MetricName") }) t.Run("When calling getDimensionsForCustomMetrics", func(t *testing.T) { - dsInfo := &DatasourceInfo{ - Region: "us-east-1", - Namespace: "Foo", - Profile: "default", - AssumeRoleArn: "", + const region = "us-east-1" + e := &cloudWatchExecutor{ + DataSource: &models.DataSource{ + Database: "default", + JsonData: simplejson.NewFromAny(map[string]interface{}{ + "Region": region, + }), + }, } - f := func(dsInfo *DatasourceInfo) (cloudwatch.ListMetricsOutput, error) { + f := func(region string) (cloudwatch.ListMetricsOutput, error) { return cloudwatch.ListMetricsOutput{ Metrics: []*cloudwatch.Metric{ { @@ -91,14 +97,14 @@ func TestCloudWatchMetrics(t *testing.T) { }, }, nil } - dimensionKeys, err := getDimensionsForCustomMetrics(dsInfo, f) + dimensionKeys, err := e.getDimensionsForCustomMetrics(region, f) require.NoError(t, err) assert.Contains(t, dimensionKeys, "Test_DimensionName") }) t.Run("When calling handleGetRegions", func(t *testing.T) { - executor := &CloudWatchExecutor{ + executor := &cloudWatchExecutor{ ec2Svc: mockedEc2{RespRegions: ec2.DescribeRegionsOutput{ Regions: []*ec2.Region{ { @@ -123,7 +129,7 @@ func TestCloudWatchMetrics(t *testing.T) { }) t.Run("When calling handleGetEc2InstanceAttribute", func(t *testing.T) { - executor := &CloudWatchExecutor{ + executor := &cloudWatchExecutor{ ec2Svc: mockedEc2{Resp: ec2.DescribeInstancesOutput{ Reservations: []*ec2.Reservation{ { @@ -156,7 +162,7 @@ func TestCloudWatchMetrics(t *testing.T) { }) t.Run("When calling handleGetEbsVolumeIds", func(t *testing.T) { - executor := &CloudWatchExecutor{ + executor := &cloudWatchExecutor{ ec2Svc: mockedEc2{Resp: ec2.DescribeInstancesOutput{ Reservations: []*ec2.Reservation{ { @@ -217,7 +223,7 @@ func TestCloudWatchMetrics(t *testing.T) { }) t.Run("When calling handleGetResourceArns", func(t *testing.T) { - executor := &CloudWatchExecutor{ + executor := &cloudWatchExecutor{ rgtaSvc: mockedRGTA{ Resp: resourcegroupstaggingapi.GetResourcesOutput{ ResourceTagMappingList: []*resourcegroupstaggingapi.ResourceTagMapping{ diff --git a/pkg/tsdb/cloudwatch/query_transformer.go b/pkg/tsdb/cloudwatch/query_transformer.go index b15aacf6255..ca4b207aafc 100644 --- a/pkg/tsdb/cloudwatch/query_transformer.go +++ b/pkg/tsdb/cloudwatch/query_transformer.go @@ -13,7 +13,7 @@ import ( // has more than one statistic defined, one cloudwatchQuery will be created for each statistic. // If the query doesn't have an Id defined by the user, we'll give it an with format `query[RefId]`. In the case // the incoming query had more than one stat, it will ge an id like `query[RefId]_[StatName]`, eg queryC_Average -func (e *CloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (map[string]*cloudWatchQuery, error) { +func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (map[string]*cloudWatchQuery, error) { cloudwatchQueries := make(map[string]*cloudWatchQuery) for _, requestQuery := range requestQueries { for _, stat := range requestQuery.Statistics { @@ -50,7 +50,7 @@ func (e *CloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ return cloudwatchQueries, nil } -func (e *CloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult { +func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult { responsesByRefID := make(map[string][]*cloudwatchResponse) for _, res := range cloudwatchResponses { responsesByRefID[res.RefId] = append(responsesByRefID[res.RefId], res) diff --git a/pkg/tsdb/cloudwatch/query_transformer_test.go b/pkg/tsdb/cloudwatch/query_transformer_test.go index 9ad7d1ca861..71794dfabee 100644 --- a/pkg/tsdb/cloudwatch/query_transformer_test.go +++ b/pkg/tsdb/cloudwatch/query_transformer_test.go @@ -10,7 +10,7 @@ import ( func TestQueryTransformer(t *testing.T) { Convey("TestQueryTransformer", t, func() { Convey("when transforming queries", func() { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} Convey("one cloudwatchQuery is generated when its request query has one stat", func() { requestQueries := []*requestQuery{ { diff --git a/pkg/tsdb/cloudwatch/request_parser.go b/pkg/tsdb/cloudwatch/request_parser.go index cc0efb07984..f0a497c19d0 100644 --- a/pkg/tsdb/cloudwatch/request_parser.go +++ b/pkg/tsdb/cloudwatch/request_parser.go @@ -15,7 +15,7 @@ import ( ) // Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row -func (e *CloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) { +func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) { requestQueries := make(map[string][]*requestQuery) for i, model := range queryContext.Queries { queryType := model.Model.Get("type").MustString() diff --git a/pkg/tsdb/cloudwatch/response_parser.go b/pkg/tsdb/cloudwatch/response_parser.go index cc3dbdca70b..95be4e45795 100644 --- a/pkg/tsdb/cloudwatch/response_parser.go +++ b/pkg/tsdb/cloudwatch/response_parser.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/grafana/pkg/tsdb" ) -func (e *CloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) { +func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) { mdr := make(map[string]map[string]*cloudwatch.MetricDataResult) for _, mdo := range metricDataOutputs { requestExceededMaxLimit := false diff --git a/pkg/tsdb/cloudwatch/time_series_query.go b/pkg/tsdb/cloudwatch/time_series_query.go index 23725f049ec..58fe7f2e37b 100644 --- a/pkg/tsdb/cloudwatch/time_series_query.go +++ b/pkg/tsdb/cloudwatch/time_series_query.go @@ -9,7 +9,7 @@ import ( "golang.org/x/sync/errgroup" ) -func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { +func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { startTime, err := queryContext.TimeRange.ParseFrom() if err != nil { return nil, err @@ -50,7 +50,7 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo } }() - client, err := e.getClient(region) + client, err := e.getCWClient(region) if err != nil { return err } diff --git a/pkg/tsdb/cloudwatch/time_series_query_test.go b/pkg/tsdb/cloudwatch/time_series_query_test.go index f22f5f9bfbb..4af80a8804c 100644 --- a/pkg/tsdb/cloudwatch/time_series_query_test.go +++ b/pkg/tsdb/cloudwatch/time_series_query_test.go @@ -9,7 +9,7 @@ import ( ) func TestTimeSeriesQuery(t *testing.T) { - executor := &CloudWatchExecutor{} + executor := &cloudWatchExecutor{} t.Run("End time before start time should result in error", func(t *testing.T) { _, err := executor.executeTimeSeriesQuery(context.TODO(), &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("now-1h", "now-2h")})