diff --git a/pkg/tsdb/cloudwatch/cloudwatch.go b/pkg/tsdb/cloudwatch/cloudwatch.go index 49d71b1399d..1cfc63aa3d7 100644 --- a/pkg/tsdb/cloudwatch/cloudwatch.go +++ b/pkg/tsdb/cloudwatch/cloudwatch.go @@ -147,13 +147,6 @@ func (e *cloudWatchExecutor) CallResource(ctx context.Context, req *backend.Call func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { logger := logger.FromContext(ctx) - /* - Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response - to the query, but rather an ID is first returned. Following this, a client is expected to send requests along - with the ID until the status of the query is complete, receiving (possibly partial) results each time. For - queries made via dashboards and Explore, the logic of making these repeated queries is handled on the - frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here. - */ q := req.Queries[0] var model DataQueryJson err := json.Unmarshal(q.JSON, &model) diff --git a/pkg/tsdb/cloudwatch/log_sync_query.go b/pkg/tsdb/cloudwatch/log_sync_query.go index d9949f3d7ac..5d6fbd08774 100644 --- a/pkg/tsdb/cloudwatch/log_sync_query.go +++ b/pkg/tsdb/cloudwatch/log_sync_query.go @@ -14,14 +14,16 @@ import ( "github.com/grafana/grafana/pkg/tsdb/cloudwatch/models" ) -const ( - alertMaxAttempts = 8 - alertPollPeriod = time.Second -) +const initialAlertPollPeriod = time.Second var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { resp := backend.NewQueryDataResponse() + instance, err := e.getInstance(ctx, req.PluginContext) + if err != nil { + return nil, err + } + for _, q := range req.Queries { var logsQuery models.LogsQuery err := json.Unmarshal(q.JSON, &logsQuery) @@ -36,10 +38,6 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req * region := logsQuery.Region if logsQuery.Region == "" || region == defaultRegion { - instance, err := e.getInstance(ctx, req.PluginContext) - if err != nil { - return nil, err - } logsQuery.Region = instance.Settings.Region } @@ -48,7 +46,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req * return nil, err } - getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery) + getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery, instance.Settings.LogsTimeout.Duration) if err != nil { return nil, err } @@ -82,7 +80,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req * } func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, - queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) { + queryContext backend.DataQuery, logsQuery models.LogsQuery, logsTimeout time.Duration) (*cloudwatchlogs.GetQueryResultsOutput, error) { startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange) if err != nil { return nil, err @@ -95,7 +93,15 @@ func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatc QueryId: *startQueryOutput.QueryId, } - ticker := time.NewTicker(alertPollPeriod) + /* + Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response + to the query, but rather an ID is first returned. Following this, a client is expected to send requests along + with the ID until the status of the query is complete, receiving (possibly partial) results each time. For + queries made via dashboards and Explore, the logic of making these repeated queries is handled on the + frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here. + */ + + ticker := time.NewTicker(initialAlertPollPeriod) defer ticker.Stop() attemptCount := 1 @@ -107,8 +113,8 @@ func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatc if isTerminated(*res.Status) { return res, err } - if attemptCount >= alertMaxAttempts { - return res, fmt.Errorf("fetching of query results exceeded max number of attempts") + if time.Duration(attemptCount)*time.Second >= logsTimeout { + return res, fmt.Errorf("time to fetch query results exceeded logs timeout") } attemptCount++ diff --git a/pkg/tsdb/cloudwatch/log_sync_query_test.go b/pkg/tsdb/cloudwatch/log_sync_query_test.go index 53cae18516a..e43b616f386 100644 --- a/pkg/tsdb/cloudwatch/log_sync_query_test.go +++ b/pkg/tsdb/cloudwatch/log_sync_query_test.go @@ -307,4 +307,33 @@ func Test_executeSyncLogQuery_handles_RefId_from_input_queries(t *testing.T) { require.True(t, ok) assert.Equal(t, []*data.Field{expectedLogFieldFromSecondCall}, respB.Frames[0].Fields) }) + t.Run("when logsTimeout setting is defined, the polling period will be set to that variable", func(t *testing.T) { + cli = &mockLogsSyncClient{} + cli.On("StartQueryWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.StartQueryOutput{ + QueryId: aws.String("abcd-efgh-ijkl-mnop"), + }, nil) + cli.On("GetQueryResultsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.GetQueryResultsOutput{Status: aws.String("Running")}, nil) + im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { + return DataSource{Settings: models.CloudWatchSettings{LogsTimeout: models.Duration{Duration: time.Millisecond}}}, nil + }) + + executor := newExecutor(im, newTestConfig(), &fakeSessionCache{}, featuremgmt.WithFeatures()) + + _, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{ + Headers: map[string]string{ngalertmodels.FromAlertHeaderName: "some value"}, + PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}}, + Queries: []backend.DataQuery{ + { + RefID: "A", + TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)}, + JSON: json.RawMessage(`{ + "queryMode": "Logs", + "expression": "query string for A" + }`), + }, + }, + }) + assert.Error(t, err) + cli.AssertNumberOfCalls(t, "GetQueryResultsWithContext", 1) + }) } diff --git a/pkg/tsdb/cloudwatch/models/settings.go b/pkg/tsdb/cloudwatch/models/settings.go index e02ea2f2c4a..99ab194fcbb 100644 --- a/pkg/tsdb/cloudwatch/models/settings.go +++ b/pkg/tsdb/cloudwatch/models/settings.go @@ -3,15 +3,20 @@ package models import ( "encoding/json" "fmt" + "time" "github.com/grafana/grafana-aws-sdk/pkg/awsds" "github.com/grafana/grafana-plugin-sdk-go/backend" ) +type Duration struct { + time.Duration +} type CloudWatchSettings struct { awsds.AWSDatasourceSettings - Namespace string `json:"customMetricsNamespaces"` - SecureSocksProxyEnabled bool `json:"enableSecureSocksProxy"` // this can be removed when https://github.com/grafana/grafana/issues/39089 is implemented + Namespace string `json:"customMetricsNamespaces"` + SecureSocksProxyEnabled bool `json:"enableSecureSocksProxy"` // this can be removed when https://github.com/grafana/grafana/issues/39089 is implemented + LogsTimeout Duration `json:"logsTimeout"` } func LoadCloudWatchSettings(config backend.DataSourceInstanceSettings) (CloudWatchSettings, error) { @@ -30,8 +35,38 @@ func LoadCloudWatchSettings(config backend.DataSourceInstanceSettings) (CloudWat instance.Profile = config.Database } + // logs timeout default is 30 minutes, the same as timeout in frontend logs query + // note: for alerting queries, the context will be cancelled before that unless evaluation_timeout_seconds in defaults.ini is increased (default: 30s) + if instance.LogsTimeout.Duration == 0 { + instance.LogsTimeout = Duration{30 * time.Minute} + } + instance.AccessKey = config.DecryptedSecureJSONData["accessKey"] instance.SecretKey = config.DecryptedSecureJSONData["secretKey"] return instance, nil } + +func (duration *Duration) UnmarshalJSON(b []byte) error { + var unmarshalledJson interface{} + + err := json.Unmarshal(b, &unmarshalledJson) + if err != nil { + return err + } + + switch value := unmarshalledJson.(type) { + case float64: + *duration = Duration{time.Duration(value)} + case string: + dur, err := time.ParseDuration(value) + if err != nil { + return err + } + *duration = Duration{dur} + default: + return fmt.Errorf("invalid duration: %#v", unmarshalledJson) + } + + return nil +} diff --git a/pkg/tsdb/cloudwatch/models/settings_test.go b/pkg/tsdb/cloudwatch/models/settings_test.go index 55ea1ddac59..5b011b0e491 100644 --- a/pkg/tsdb/cloudwatch/models/settings_test.go +++ b/pkg/tsdb/cloudwatch/models/settings_test.go @@ -2,6 +2,7 @@ package models import ( "testing" + "time" "github.com/grafana/grafana-aws-sdk/pkg/awsds" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -10,6 +11,24 @@ import ( ) func Test_Settings_LoadCloudWatchSettings(t *testing.T) { + t.Run("Should return error for invalid json", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": fluffles^.^, + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana", + "logsTimeout": "10m" + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + _, err := LoadCloudWatchSettings(settings) + + assert.Error(t, err) + }) t.Run("Should parse keys query type", func(t *testing.T) { settings := backend.DataSourceInstanceSettings{ ID: 33, @@ -71,4 +90,109 @@ func Test_Settings_LoadCloudWatchSettings(t *testing.T) { assert.Equal(t, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", s.SecretKey) assert.Equal(t, "AKIAIOSFODNN7EXAMPLE", s.AccessKey) }) + t.Run("Should set logsTimeout to default duration if it is not defined", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": "arn", + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana" + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + s, err := LoadCloudWatchSettings(settings) + require.NoError(t, err) + assert.Equal(t, time.Minute*30, s.LogsTimeout.Duration) + }) + t.Run("Should correctly parse logsTimeout duration string", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": "arn", + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana", + "logsTimeout": "10m" + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + s, err := LoadCloudWatchSettings(settings) + require.NoError(t, err) + assert.Equal(t, time.Minute*10, s.LogsTimeout.Duration) + }) + t.Run("Should correctly parse logsTimeout string with float number", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": "arn", + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana", + "logsTimeout": "1.5s" + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + s, err := LoadCloudWatchSettings(settings) + require.NoError(t, err) + assert.Equal(t, time.Duration(1500000000), s.LogsTimeout.Duration) + }) + t.Run("Should correctly parse logsTimeout duration in nanoseconds", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": "arn", + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana", + "logsTimeout": 1500000000 + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + s, err := LoadCloudWatchSettings(settings) + require.NoError(t, err) + assert.Equal(t, 1500*time.Millisecond, s.LogsTimeout.Duration) + }) + t.Run("Should throw error if logsTimeout is an invalid duration format", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": "arn", + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana", + "logsTimeout": "10mm" + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + _, err := LoadCloudWatchSettings(settings) + require.Error(t, err) + }) + t.Run("Should throw error if logsTimeout is an invalid type", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + ID: 33, + JSONData: []byte(`{ + "authType": "arn", + "assumeRoleArn": "arn:aws:iam::123456789012:role/grafana", + "logsTimeout": true + }`), + DecryptedSecureJSONData: map[string]string{ + "accessKey": "AKIAIOSFODNN7EXAMPLE", + "secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + }, + } + + _, err := LoadCloudWatchSettings(settings) + require.Error(t, err) + }) } diff --git a/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx b/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx index 55861707828..0263c9117c2 100644 --- a/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx +++ b/public/app/plugins/datasource/cloudwatch/components/ConfigEditor.tsx @@ -74,9 +74,9 @@ export const ConfigEditor = (props: Props) => {