From c68feecb6e2c9646701dcd04b37a47b37a964d3a Mon Sep 17 00:00:00 2001 From: Isabella Siu Date: Tue, 30 Jul 2024 09:55:01 -0400 Subject: [PATCH] CloudWatch: Add errorsource for QueryData (#91085) --- pkg/tsdb/cloudwatch/annotation_query.go | 14 +++++--- .../cloudwatch/get_metric_data_executor.go | 3 +- pkg/tsdb/cloudwatch/log_actions.go | 35 ++++++++++++------- pkg/tsdb/cloudwatch/log_sync_query.go | 19 ++++++---- pkg/tsdb/cloudwatch/log_sync_query_test.go | 9 +++-- .../cloudwatch/models/cloudwatch_query.go | 3 +- pkg/tsdb/cloudwatch/time_series_query.go | 9 +++-- 7 files changed, 58 insertions(+), 34 deletions(-) diff --git a/pkg/tsdb/cloudwatch/annotation_query.go b/pkg/tsdb/cloudwatch/annotation_query.go index c65bece8c2e..074f30d5096 100644 --- a/pkg/tsdb/cloudwatch/annotation_query.go +++ b/pkg/tsdb/cloudwatch/annotation_query.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" ) @@ -35,7 +36,7 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC if model.Period != nil && *model.Period != "" { p, err := strconv.ParseInt(*model.Period, 10, 64) if err != nil { - return nil, err + return nil, errorsource.DownstreamError(fmt.Errorf("query period must be an int"), false) } period = p } @@ -80,12 +81,13 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC } resp, err := cli.DescribeAlarms(params) if err != nil { - return nil, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarms", err) + result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarms", err)) + return result, nil } alarmNames = filterAlarms(resp, utils.Depointerizer(model.Namespace), metricName, dimensions, statistic, period) } else { if model.Region == nil || model.Namespace == nil || metricName == "" || statistic == "" { - return result, errors.New("invalid annotations query") + return result, errorsource.DownstreamError(errors.New("invalid annotations query"), false) } var qd []*cloudwatch.Dimension @@ -110,7 +112,8 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC } resp, err := cli.DescribeAlarmsForMetric(params) if err != nil { - return nil, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmsForMetric", err) + result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmsForMetric", err)) + return result, nil } for _, alarm := range resp.MetricAlarms { alarmNames = append(alarmNames, alarm.AlarmName) @@ -127,7 +130,8 @@ func (e *cloudWatchExecutor) executeAnnotationQuery(ctx context.Context, pluginC } resp, err := cli.DescribeAlarmHistory(params) if err != nil { - return nil, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmHistory", err) + result = errorsource.AddDownstreamErrorToResponse(query.RefID, result, fmt.Errorf("%v: %w", "failed to call cloudwatch:DescribeAlarmHistory", err)) + return result, nil } for _, history := range resp.AlarmHistoryItems { annotations = append(annotations, &annotationEvent{ diff --git a/pkg/tsdb/cloudwatch/get_metric_data_executor.go b/pkg/tsdb/cloudwatch/get_metric_data_executor.go index 76c1b40e0e4..002fedd5639 100644 --- a/pkg/tsdb/cloudwatch/get_metric_data_executor.go +++ b/pkg/tsdb/cloudwatch/get_metric_data_executor.go @@ -7,6 +7,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" + "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/features" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" ) @@ -27,7 +28,7 @@ func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudwat resp, err := client.GetMetricDataWithContext(ctx, metricDataInput) if err != nil { - return mdo, err + return mdo, errorsource.DownstreamError(err, false) } mdo = append(mdo, resp) diff --git a/pkg/tsdb/cloudwatch/log_actions.go b/pkg/tsdb/cloudwatch/log_actions.go index 68f8e722d23..970b55fb491 100644 --- a/pkg/tsdb/cloudwatch/log_actions.go +++ b/pkg/tsdb/cloudwatch/log_actions.go @@ -16,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/features" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/models" @@ -37,7 +38,7 @@ type AWSError struct { } func (e *AWSError) Error() string { - return fmt.Sprintf("%s: %s", e.Code, e.Message) + return fmt.Sprintf("CloudWatch error: %s: %s", e.Code, e.Message) } func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { @@ -58,7 +59,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend dataframe, err := e.executeLogAction(ectx, logsQuery, query, req.PluginContext) if err != nil { resultChan <- backend.Responses{ - query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: err}, + query.RefID: errorsource.Response(err), } return nil } @@ -84,6 +85,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend respD := resp.Responses[refID] respD.Frames = response.Frames respD.Error = response.Error + respD.ErrorSource = response.ErrorSource resp.Responses[refID] = respD } } @@ -138,12 +140,12 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient } if logsQuery.LogGroupName == "" { - return nil, fmt.Errorf("Error: Parameter 'logGroupName' is required") + return nil, errorsource.DownstreamError(fmt.Errorf("Error: Parameter 'logGroupName' is required"), false) } queryRequest.SetLogGroupName(logsQuery.LogGroupName) if logsQuery.LogStreamName == "" { - return nil, fmt.Errorf("Error: Parameter 'logStreamName' is required") + return nil, errorsource.DownstreamError(fmt.Errorf("Error: Parameter 'logStreamName' is required"), false) } queryRequest.SetLogStreamName(logsQuery.LogStreamName) @@ -157,7 +159,7 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient logEvents, err := logsClient.GetLogEventsWithContext(ctx, queryRequest) if err != nil { - return nil, err + return nil, errorsource.DownstreamError(err, false) } messages := make([]*string, 0) @@ -186,7 +188,7 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c endTime := timeRange.To if !startTime.Before(endTime) { - return nil, fmt.Errorf("invalid time range: start time must be before end time") + return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false) } // The fields @log and @logStream are always included in the results of a user's query @@ -225,18 +227,22 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c } e.logger.FromContext(ctx).Debug("Calling startquery with context with input", "input", startQueryInput) - return logsClient.StartQueryWithContext(ctx, startQueryInput) + resp, err := logsClient.StartQueryWithContext(ctx, startQueryInput) + if err != nil { + var awsErr awserr.Error + if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" { + e.logger.FromContext(ctx).Debug("ExecuteStartQuery limit exceeded", "err", awsErr) + err = &AWSError{Code: limitExceededException, Message: err.Error()} + } + err = errorsource.DownstreamError(err, false) + } + return resp, err } func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, logsQuery models.LogsQuery, timeRange backend.TimeRange, refID string) (*data.Frame, error) { startQueryResponse, err := e.executeStartQuery(ctx, logsClient, logsQuery, timeRange) if err != nil { - var awsErr awserr.Error - if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" { - e.logger.FromContext(ctx).Debug("ExecuteStartQuery limit exceeded", "err", awsErr) - return nil, &AWSError{Code: limitExceededException, Message: err.Error()} - } return nil, err } @@ -272,6 +278,8 @@ func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cl if errors.As(err, &awsErr) && awsErr.Code() == "InvalidParameterException" { response = &cloudwatchlogs.StopQueryOutput{Success: aws.Bool(false)} err = nil + } else { + err = errorsource.DownstreamError(err, false) } } @@ -299,8 +307,9 @@ func (e *cloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsCli if err != nil { var awsErr awserr.Error if errors.As(err, &awsErr) { - return getQueryResultsResponse, &AWSError{Code: awsErr.Code(), Message: err.Error()} + err = &AWSError{Code: awsErr.Code(), Message: err.Error()} } + err = errorsource.DownstreamError(err, false) } return getQueryResultsResponse, err } diff --git a/pkg/tsdb/cloudwatch/log_sync_query.go b/pkg/tsdb/cloudwatch/log_sync_query.go index 9243058184b..3baaac9c076 100644 --- a/pkg/tsdb/cloudwatch/log_sync_query.go +++ b/pkg/tsdb/cloudwatch/log_sync_query.go @@ -3,6 +3,7 @@ package cloudwatch import ( "context" "encoding/json" + "errors" "fmt" "time" @@ -10,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/models" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" @@ -47,7 +49,17 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req * return nil, err } + refId := "A" + if q.RefID != "" { + refId = q.RefID + } + getQueryResultsOutput, err := e.syncQuery(ctx, logsClient, q, logsQuery, instance.Settings.LogsTimeout.Duration) + var sourceError errorsource.Error + if errors.As(err, &sourceError) { + errorsource.AddErrorToResponse(refId, resp, sourceError) + continue + } if err != nil { return nil, err } @@ -67,11 +79,6 @@ var executeSyncLogQuery = func(ctx context.Context, e *cloudWatchExecutor, req * frames = data.Frames{dataframe} } - refId := "A" - if q.RefID != "" { - refId = q.RefID - } - respD := resp.Responses[refId] respD.Frames = frames resp.Responses[refId] = respD @@ -109,7 +116,7 @@ func (e *cloudWatchExecutor) syncQuery(ctx context.Context, logsClient cloudwatc for range ticker.C { res, err := e.executeGetQueryResults(ctx, logsClient, requestParams) if err != nil { - return nil, fmt.Errorf("CloudWatch Error: %w", err) + return nil, err } if isTerminated(*res.Status) { return res, err diff --git a/pkg/tsdb/cloudwatch/log_sync_query_test.go b/pkg/tsdb/cloudwatch/log_sync_query_test.go index f1bed59ccfa..cb1f8d0cab3 100644 --- a/pkg/tsdb/cloudwatch/log_sync_query_test.go +++ b/pkg/tsdb/cloudwatch/log_sync_query_test.go @@ -390,14 +390,17 @@ func Test_executeSyncLogQuery_handles_RefId_from_input_queries(t *testing.T) { { TimeRange: backend.TimeRange{From: time.Unix(0, 0), To: time.Unix(1, 0)}, JSON: json.RawMessage(`{ + "refId": "A", "queryMode": "Logs" }`), }, }, }) - require.Nil(t, res) - require.Error(t, err) - require.Equal(t, "CloudWatch Error: foo: bar", err.Error()) + require.NotNil(t, res) + require.NotNil(t, res.Responses["A"]) + require.Equal(t, "CloudWatch error: foo: bar", res.Responses["A"].Error.Error()) + require.Equal(t, backend.ErrorSourceDownstream, res.Responses["A"].ErrorSource) + require.Nil(t, err) }) } diff --git a/pkg/tsdb/cloudwatch/models/cloudwatch_query.go b/pkg/tsdb/cloudwatch/models/cloudwatch_query.go index 56709d8442f..6bc2999e3bf 100644 --- a/pkg/tsdb/cloudwatch/models/cloudwatch_query.go +++ b/pkg/tsdb/cloudwatch/models/cloudwatch_query.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend/log" + "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/kinds/dataquery" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" ) @@ -310,7 +311,7 @@ func (q *CloudWatchQuery) migrateLegacyQuery(query metricsDataQuery) { func (q *CloudWatchQuery) validateAndSetDefaults(refId string, metricsDataQuery metricsDataQuery, startTime, endTime time.Time, defaultRegionValue string, crossAccountQueryingEnabled bool) error { if metricsDataQuery.Statistic == nil && metricsDataQuery.Statistics == nil { - return fmt.Errorf("query must have either statistic or statistics field") + return errorsource.DownstreamError(fmt.Errorf("query must have either statistic or statistics field"), false) } var err error diff --git a/pkg/tsdb/cloudwatch/time_series_query.go b/pkg/tsdb/cloudwatch/time_series_query.go index e7d09de8ef0..e872c319d76 100644 --- a/pkg/tsdb/cloudwatch/time_series_query.go +++ b/pkg/tsdb/cloudwatch/time_series_query.go @@ -8,6 +8,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/features" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/models" "github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils" @@ -23,13 +24,13 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba resp := backend.NewQueryDataResponse() if len(req.Queries) == 0 { - return nil, fmt.Errorf("request contains no queries") + return nil, errorsource.DownstreamError(fmt.Errorf("request contains no queries"), false) } // startTime and endTime are always the same for all queries startTime := req.Queries[0].TimeRange.From endTime := req.Queries[0].TimeRange.To if !startTime.Before(endTime) { - return nil, fmt.Errorf("invalid time range: start time must be before end time") + return nil, errorsource.DownstreamError(fmt.Errorf("invalid time range: start time must be before end time"), false) } instance, err := e.getInstance(ctx, req.PluginContext) @@ -127,9 +128,7 @@ func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, req *ba } if err := eg.Wait(); err != nil { - dataResponse := backend.DataResponse{ - Error: fmt.Errorf("metric request error: %q", err), - } + dataResponse := errorsource.Response(fmt.Errorf("metric request error: %w", err)) resultChan <- &responseWrapper{ RefId: getQueryRefIdFromErrorString(err.Error(), requestQueries), DataResponse: &dataResponse,