CloudWatch: Add errorsource for QueryData (#91085)

This commit is contained in:
Isabella Siu 2024-07-30 09:55:01 -04:00 committed by GitHub
parent 3e138449bb
commit c68feecb6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 58 additions and 34 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
)
@ -35,7 +36,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
if model.Period != nil && *model.Period != "" {
p, err := strconv.ParseInt(*model.Period, 10, 64)
if err != nil {
return nil, err
return nil, errorsource.DownstreamError(fmt.Errorf("query period must be an int"), false)
}
period = p
}
@ -80,12 +81,13 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
}
resp, err := cli.DescribeAlarms(params)
if err != nil {
return nil, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarms", err)
result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarms", err))
return result, nil
}
alarmNames = filterAlarms(resp, utils.Depointerizer(model.Namespace), metricName, dimensions, statistic, period)
} else {
if model.Region == nil || model.Namespace == nil || metricName == "" || statistic == "" {
return result, errors.New("invalid annotations query")
return result, errorsource.DownstreamError(errors.New("invalid annotations query"), false)
}
var qd []*cloudwatch.Dimension
@ -110,7 +112,8 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
}
resp, err := cli.DescribeAlarmsForMetric(params)
if err != nil {
return nil, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmsForMetric", err)
result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmsForMetric", err))
return result, nil
}
for _, alarm := range resp.MetricAlarms {
alarmNames = append(alarmNames, alarm.AlarmName)
@ -127,7 +130,8 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC
}
resp, err := cli.DescribeAlarmHistory(params)
if err != nil {
return nil, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmHistory", err)
result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmHistory", err))
return result, nil
}
for _, history := range resp.AlarmHistoryItems {
annotations = append(annotations, &annotationEvent{

View File

@ -7,6 +7,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
)
@ -27,7 +28,7 @@ func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudwat
resp, err := client.GetMetricDataWithContext(ctx, metricDataInput)
if err != nil {
return mdo, err
return mdo, errorsource.DownstreamError(err, false)
}
mdo = append(mdo, resp)

View File

@ -16,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
@ -37,7 +38,7 @@ type AWSError struct {
}
func (e *AWSError) Error() string {
return fmt.Sprintf("%s: %s", e.Code, e.Message)
return fmt.Sprintf("CloudWatch error: %s: %s", e.Code, e.Message)
}
func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
@ -58,7 +59,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
dataframe, err := e.executeLogAction(ectx, logsQuery, query, req.PluginContext)
if err != nil {
resultChan <- backend.Responses{
query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: err},
query.RefID: errorsource.Response(err),
}
return nil
}
@ -84,6 +85,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
respD := resp.Responses[refID]
respD.Frames = response.Frames
respD.Error = response.Error
respD.ErrorSource = response.ErrorSource
resp.Responses[refID] = respD
}
}
@ -138,12 +140,12 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient
}
if logsQuery.LogGroupName == "" {
return nil, fmt.Errorf("Error: Parameter 'logGroupName' is required")
return nil, errorsource.DownstreamError(fmt.Errorf("Error: Parameter 'logGroupName' is required"), false)
}
queryRequest.SetLogGroupName(logsQuery.LogGroupName)
if logsQuery.LogStreamName == "" {
return nil, fmt.Errorf("Error: Parameter 'logStreamName' is required")
return nil, errorsource.DownstreamError(fmt.Errorf("Error: Parameter 'logStreamName' is required"), false)
}
queryRequest.SetLogStreamName(logsQuery.LogStreamName)
@ -157,7 +159,7 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient
logEvents, err := logsClient.GetLogEventsWithContext(ctx, queryRequest)
if err != nil {
return nil, err
return nil, errorsource.DownstreamError(err, false)
}
messages := make([]*string, 0)
@ -186,7 +188,7 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
endTime := timeRange.To
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time")
return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false)
}
// The fields @log and @logStream are always included in the results of a user's query
@ -225,18 +227,22 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
}
e.logger.FromContext(ctx).Debug("Calling startquery with context with input", "input", startQueryInput)
return logsClient.StartQueryWithContext(ctx, startQueryInput)
resp, err := logsClient.StartQueryWithContext(ctx, startQueryInput)
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" {
e.logger.FromContext(ctx).Debug("ExecuteStartQuery limit exceeded", "err", awsErr)
err = &AWSError{Code: limitExceededException, Message: err.Error()}
}
err = errorsource.DownstreamError(err, false)
}
return resp, err
}
func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
logsQuery models.LogsQuery, timeRange backend.TimeRange, refID string) (*data.Frame, error) {
startQueryResponse, err := e.executeStartQuery(ctx, logsClient, logsQuery, timeRange)
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" {
e.logger.FromContext(ctx).Debug("ExecuteStartQuery limit exceeded", "err", awsErr)
return nil, &AWSError{Code: limitExceededException, Message: err.Error()}
}
return nil, err
}
@ -272,6 +278,8 @@ func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cl
if errors.As(err, &awsErr) && awsErr.Code() == "InvalidParameterException" {
response = &cloudwatchlogs.StopQueryOutput{Success: aws.Bool(false)}
err = nil
} else {
err = errorsource.DownstreamError(err, false)
}
}
@ -299,8 +307,9 @@ func (e *cloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsCli
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) {
return getQueryResultsResponse, &AWSError{Code: awsErr.Code(), Message: err.Error()}
err = &AWSError{Code: awsErr.Code(), Message: err.Error()}
}
err = errorsource.DownstreamError(err, false)
}
return getQueryResultsResponse, err
}

View File

@ -3,6 +3,7 @@ package cloudwatch
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
@ -10,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
@ -47,7 +49,17 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
return nil, err
}
refId := "A"
if q.RefID != "" {
refId = q.RefID
}
getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery, instance.Settings.LogsTimeout.Duration)
var sourceError errorsource.Error
if errors.As(err, &sourceError) {
errorsource.AddErrorToResponse(refId, resp, sourceError)
continue
}
if err != nil {
return nil, err
}
@ -67,11 +79,6 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req *
frames = data.Frames{dataframe}
}
refId := "A"
if q.RefID != "" {
refId = q.RefID
}
respD := resp.Responses[refId]
respD.Frames = frames
resp.Responses[refId] = respD
@ -109,7 +116,7 @@ func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatc
for range ticker.C {
res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
if err != nil {
return nil, fmt.Errorf("CloudWatch Error: %w", err)
return nil, err
}
if isTerminated(*res.Status) {
return res, err

View File

@ -390,14 +390,17 @@ func Test_executeSyncLogQuery_handles_RefId_from_input_queries(t *testing.T) {
{
TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)},
JSON: json.RawMessage(`{
"refId": "A",
"queryMode": "Logs"
}`),
},
},
})
require.Nil(t, res)
require.Error(t, err)
require.Equal(t, "CloudWatch Error: foo: bar", err.Error())
require.NotNil(t, res)
require.NotNil(t, res.Responses["A"])
require.Equal(t, "CloudWatch error: foo: bar", res.Responses["A"].Error.Error())
require.Equal(t, backend.ErrorSourceDownstream, res.Responses["A"].ErrorSource)
require.Nil(t, err)
})
}

View File

@ -16,6 +16,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
)
@ -310,7 +311,7 @@ func (q *CloudWatchQuery) migrateLegacyQuery(query metricsDataQuery) {
func (q *CloudWatchQuery) validateAndSetDefaults(refId string, metricsDataQuery metricsDataQuery, startTime, endTime time.Time,
defaultRegionValue string, crossAccountQueryingEnabled bool) error {
if metricsDataQuery.Statistic == nil && metricsDataQuery.Statistics == nil {
return fmt.Errorf("query must have either statistic or statistics field")
return errorsource.DownstreamError(fmt.Errorf("query must have either statistic or statistics field"), false)
}
var err error

View File

@ -8,6 +8,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/features"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
@ -23,13 +24,13 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
resp := backend.NewQueryDataResponse()
if len(req.Queries) == 0 {
return nil, fmt.Errorf("request contains no queries")
return nil, errorsource.DownstreamError(fmt.Errorf("request contains no queries"), false)
}
// startTime and endTime are always the same for all queries
startTime := req.Queries[0].TimeRange.From
endTime := req.Queries[0].TimeRange.To
if !startTime.Before(endTime) {
return nil, fmt.Errorf("invalid time range: start time must be before end time")
return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false)
}
instance, err := e.getInstance(ctx, req.PluginContext)
@ -127,9 +128,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba
}
if err := eg.Wait(); err != nil {
dataResponse := backend.DataResponse{
Error: fmt.Errorf("metric request error: %q", err),
}
dataResponse := errorsource.Response(fmt.Errorf("metric request error: %w", err))
resultChan <- &responseWrapper{
RefId: getQueryRefIdFromErrorString(err.Error(), requestQueries),
DataResponse: &dataResponse,