Cloudwatch Logs: Set Alerting timeout to datasource config's logsTimeout (#72611)

This commit is contained in:
Ida Štambuk 2023-08-03 19:35:30 +02:00 committed by GitHub
parent b1ef145442
commit abff6e20e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 211 additions and 24 deletions

View File

@ -147,13 +147,6 @@ func (e *cloudWatchExecutor) CallResource(ctx context.Context, req *backend.Call
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger := logger.FromContext(ctx)
/*
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here.
*/
q := req.Queries[0]
var model DataQueryJson
err := json.Unmarshal(q.JSON, &model)

View File

@ -14,14 +14,16 @@ import (
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
)
const (
alertMaxAttempts = 8
alertPollPeriod = time.Second
)
const initialAlertPollPeriod = time.Second
var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
instance, err := e.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
for _, q := range req.Queries {
var logsQuery models.LogsQuery
err := json.Unmarshal(q.JSON, &logsQuery)
@ -36,10 +38,6 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
region := logsQuery.Region
if logsQuery.Region == "" || region == defaultRegion {
instance, err := e.getInstance(ctx, req.PluginContext)
if err != nil {
return nil, err
}
logsQuery.Region = instance.Settings.Region
}
@ -48,7 +46,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
return nil, err
}
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery)
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery, instance.Settings.LogsTimeout.Duration)
if err != nil {
return nil, err
}
@ -82,7 +80,7 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
}
func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
queryContext backend.DataQuery, logsQuery models.LogsQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
queryContext backend.DataQuery, logsQuery models.LogsQuery, logsTimeout time.Duration) (*cloudwatchlogs.GetQueryResultsOutput, error) {
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, logsQuery, queryContext.TimeRange)
if err != nil {
return nil, err
@ -95,7 +93,15 @@ func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatc
QueryId: *startQueryOutput.QueryId,
}
ticker := time.NewTicker(alertPollPeriod)
/*
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts and expressions are executed on the backend the logic needs to be reimplemented here.
*/
ticker := time.NewTicker(initialAlertPollPeriod)
defer ticker.Stop()
attemptCount := 1
@ -107,8 +113,8 @@ func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatc
if isTerminated(*res.Status) {
return res, err
}
if attemptCount >= alertMaxAttempts {
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
if time.Duration(attemptCount)*time.Second >= logsTimeout {
return res, fmt.Errorf("time to fetch query results exceeded logs timeout")
}
attemptCount++

View File

@ -307,4 +307,33 @@ func Test_executeSyncLogQuery_handles_RefId_from_input_queries(t *testing.T) {
require.True(t, ok)
assert.Equal(t, []*data.Field{expectedLogFieldFromSecondCall}, respB.Frames[0].Fields)
})
t.Run("when logsTimeout setting is defined, the polling period will be set to that variable", func(t *testing.T) {
cli = &mockLogsSyncClient{}
cli.On("StartQueryWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.StartQueryOutput{
QueryId: aws.String("abcd-efgh-ijkl-mnop"),
}, nil)
cli.On("GetQueryResultsWithContext", mock.Anything, mock.Anything, mock.Anything).Return(&cloudwatchlogs.GetQueryResultsOutput{Status: aws.String("Running")}, nil)
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
return DataSource{Settings: models.CloudWatchSettings{LogsTimeout: models.Duration{Duration: time.Millisecond}}}, nil
})
executor := newExecutor(im, newTestConfig(), &fakeSessionCache{}, featuremgmt.WithFeatures())
_, err := executor.QueryData(context.Background(), &backend.QueryDataRequest{
Headers: map[string]string{ngalertmodels.FromAlertHeaderName: "some value"},
PluginContext: backend.PluginContext{DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{}},
Queries: []backend.DataQuery{
{
RefID: "A",
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"queryMode": "Logs",
"expression": "query string for A"
}`),
},
},
})
assert.Error(t, err)
cli.AssertNumberOfCalls(t, "GetQueryResultsWithContext", 1)
})
}

View File

@ -3,15 +3,20 @@ package models
import (
"encoding/json"
"fmt"
"time"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
type Duration struct {
time.Duration
}
type CloudWatchSettings struct {
awsds.AWSDatasourceSettings
Namespace string `json:"customMetricsNamespaces"`
SecureSocksProxyEnabled bool `json:"enableSecureSocksProxy"` // this can be removed when https://github.com/grafana/grafana/issues/39089 is implemented
Namespace string `json:"customMetricsNamespaces"`
SecureSocksProxyEnabled bool `json:"enableSecureSocksProxy"` // this can be removed when https://github.com/grafana/grafana/issues/39089 is implemented
LogsTimeout Duration `json:"logsTimeout"`
}
func LoadCloudWatchSettings(config backend.DataSourceInstanceSettings) (CloudWatchSettings, error) {
@ -30,8 +35,38 @@ func LoadCloudWatchSettings(config backend.DataSourceInstanceSettings) (CloudWat
instance.Profile = config.Database
}
// logs timeout default is 30 minutes, the same as timeout in frontend logs query
// note: for alerting queries, the context will be cancelled before that unless evaluation_timeout_seconds in defaults.ini is increased (default: 30s)
if instance.LogsTimeout.Duration == 0 {
instance.LogsTimeout = Duration{30 * time.Minute}
}
instance.AccessKey = config.DecryptedSecureJSONData["accessKey"]
instance.SecretKey = config.DecryptedSecureJSONData["secretKey"]
return instance, nil
}
func (duration *Duration) UnmarshalJSON(b []byte) error {
var unmarshalledJson interface{}
err := json.Unmarshal(b, &unmarshalledJson)
if err != nil {
return err
}
switch value := unmarshalledJson.(type) {
case float64:
*duration = Duration{time.Duration(value)}
case string:
dur, err := time.ParseDuration(value)
if err != nil {
return err
}
*duration = Duration{dur}
default:
return fmt.Errorf("invalid duration: %#v", unmarshalledJson)
}
return nil
}

View File

@ -2,6 +2,7 @@ package models
import (
"testing"
"time"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
@ -10,6 +11,24 @@ import (
)
func Test_Settings_LoadCloudWatchSettings(t *testing.T) {
t.Run("Should return error for invalid json", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": fluffles^.^,
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana",
"logsTimeout": "10m"
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
_, err := LoadCloudWatchSettings(settings)
assert.Error(t, err)
})
t.Run("Should parse keys query type", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
@ -71,4 +90,109 @@ func Test_Settings_LoadCloudWatchSettings(t *testing.T) {
assert.Equal(t, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", s.SecretKey)
assert.Equal(t, "AKIAIOSFODNN7EXAMPLE", s.AccessKey)
})
t.Run("Should set logsTimeout to default duration if it is not defined", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": "arn",
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana"
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
s, err := LoadCloudWatchSettings(settings)
require.NoError(t, err)
assert.Equal(t, time.Minute*30, s.LogsTimeout.Duration)
})
t.Run("Should correctly parse logsTimeout duration string", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": "arn",
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana",
"logsTimeout": "10m"
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
s, err := LoadCloudWatchSettings(settings)
require.NoError(t, err)
assert.Equal(t, time.Minute*10, s.LogsTimeout.Duration)
})
t.Run("Should correctly parse logsTimeout string with float number", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": "arn",
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana",
"logsTimeout": "1.5s"
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
s, err := LoadCloudWatchSettings(settings)
require.NoError(t, err)
assert.Equal(t, time.Duration(1500000000), s.LogsTimeout.Duration)
})
t.Run("Should correctly parse logsTimeout duration in nanoseconds", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": "arn",
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana",
"logsTimeout": 1500000000
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
s, err := LoadCloudWatchSettings(settings)
require.NoError(t, err)
assert.Equal(t, 1500*time.Millisecond, s.LogsTimeout.Duration)
})
t.Run("Should throw error if logsTimeout is an invalid duration format", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": "arn",
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana",
"logsTimeout": "10mm"
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
_, err := LoadCloudWatchSettings(settings)
require.Error(t, err)
})
t.Run("Should throw error if logsTimeout is an invalid type", func(t *testing.T) {
settings := backend.DataSourceInstanceSettings{
ID: 33,
JSONData: []byte(`{
"authType": "arn",
"assumeRoleArn": "arn:aws:iam::123456789012:role/grafana",
"logsTimeout": true
}`),
DecryptedSecureJSONData: map[string]string{
"accessKey": "AKIAIOSFODNN7EXAMPLE",
"secretKey": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
}
_, err := LoadCloudWatchSettings(settings)
require.Error(t, err)
})
}

View File

@ -74,9 +74,9 @@ export const ConfigEditor = (props: Props) => {
<h3 className="page-heading">CloudWatch Logs</h3>
<div className="gf-form-group">
<InlineField
label="Retry Timeout"
label="Query Result Timeout"
labelWidth={28}
tooltip='Cloudwatch Logs allows for a maximum of 30 concurrent queries. If Grafana hits a concurrent max query error from Cloudwatch Logs it will auto-retry requesting a query for up to 30min. This retry timeout strategy is configurable. Must be a valid duration string, such as "15m" "30s" "2000ms" etc.'
tooltip='Grafana will poll for Cloudwatch Logs query results every second until Done status is returned from AWS or timeout is exceeded, in which case Grafana will return an error. The default period is 30 minutes. Note: For Alerting, the timeout defined in the config file will take precedence. Must be a valid duration string, such as "15m" "30s" "2000ms" etc.'
invalid={Boolean(logsTimeoutError)}
>
<Input