Alerting: Attempt to retry retryable errors (#79037)

* Alerting: Attempt to retry retryable errors

Currently in a draft state, but this was the minimal diff I could put together to exemplify how could achieve this.

Signed-off-by: gotjosh <josue.abreu@gmail.com>

---------

Signed-off-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
gotjosh 2023-12-06 16:35:22 +00:00 committed by GitHub
parent 7e331c8507
commit 3e51cf0949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 43 deletions

View File

@ -157,6 +157,23 @@ func (evalResults Results) HasErrors() bool {
return false
}
// HasNonRetryableErrors returns true if we have at least 1 result with:
// 1. A `State` of `Error`
// 2. The `Error` attribute is not nil
// 3. The `Error` type is of `&invalidEvalResultFormatError`
// Our thinking with this approach, is that we don't want to retry errors that have relation with invalid alert definition format.
func (evalResults Results) HasNonRetryableErrors() bool {
for _, r := range evalResults {
if r.State == Error && r.Error != nil {
var nonRetryableError *invalidEvalResultFormatError
if errors.As(r.Error, &nonRetryableError) {
return true
}
}
}
return false
}
// HasErrors returns true when Results contains at least one element and all elements are errors
func (evalResults Results) IsError() bool {
for _, r := range evalResults {
@ -177,6 +194,18 @@ func (evalResults Results) IsNoData() bool {
return true
}
// Error returns the aggregated `error` of all results of which state is `Error`.
func (evalResults Results) Error() error {
var errs []error
for _, result := range evalResults {
if result.State == Error && result.Error != nil {
errs = append(errs, result.Error)
}
}
return errors.Join(errs...)
}
// Result contains the evaluated State of an alert instance
// identified by its labels.
type Result struct {

View File

@ -2,6 +2,7 @@ package eval
import (
"context"
"errors"
"fmt"
"math/rand"
"testing"
@ -769,6 +770,70 @@ func TestEvaluateRaw(t *testing.T) {
})
}
func TestResults_HasNonRetryableErrors(t *testing.T) {
tc := []struct {
name string
eval Results
expected bool
}{
{
name: "with non-retryable errors",
eval: Results{
{
State: Error,
Error: &invalidEvalResultFormatError{refID: "A", reason: "unable to get frame row length", err: errors.New("weird error")},
},
},
expected: true,
},
{
name: "with retryable errors",
eval: Results{
{
State: Error,
Error: errors.New("some weird error"),
},
},
expected: false,
},
}
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, tt.eval.HasNonRetryableErrors())
})
}
}
func TestResults_Error(t *testing.T) {
tc := []struct {
name string
eval Results
expected string
}{
{
name: "with non-retryable errors",
eval: Results{
{
State: Error,
Error: &invalidEvalResultFormatError{refID: "A", reason: "unable to get frame row length", err: errors.New("weird error")},
},
{
State: Error,
Error: errors.New("unable to get a data frame"),
},
},
expected: "invalid format of evaluation results for the alert definition A: unable to get frame row length: weird error\nunable to get a data frame",
},
}
for _, tt := range tc {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, tt.eval.Error().Error())
})
}
}
type fakeExpressionService struct {
hook func(ctx context.Context, now time.Time, pipeline expr.DataPipeline) (*backend.QueryDataResponse, error)
}

View File

@ -34,6 +34,9 @@ type ScheduleService interface {
Run(context.Context) error
}
// retryDelay represents how long to wait between each failed rule evaluation.
const retryDelay = 1 * time.Second
// AlertsSender is an interface for a service that is responsible for sending notifications to the end-user.
//
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
@ -374,7 +377,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
notify(states)
}
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span) {
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
start := sch.clock.Now()
@ -398,16 +401,35 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
if err != nil || results.HasErrors() {
evalTotalFailures.Inc()
// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
if retry {
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
// This includes transport errors such as transient network errors.
if err != nil {
span.SetStatus(codes.Error, "rule evaluation failed")
span.RecordError(err)
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
}
// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
if !results.HasNonRetryableErrors() {
span.SetStatus(codes.Error, "rule evaluation failed")
span.RecordError(err)
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
}
}
// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
if results == nil {
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
}
// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
if err == nil {
for _, result := range results {
if result.Error != nil {
err = errors.Join(err, result.Error)
}
}
err = results.Error()
}
span.SetStatus(codes.Error, "rule evaluation failed")
span.RecordError(err)
} else {
@ -418,7 +440,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
logger.Debug("Skip updating the state because the context has been cancelled")
return
return nil
}
start = sch.clock.Now()
processedStates := sch.stateManager.ProcessEvalResults(
@ -440,18 +462,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
sch.alertsSender.Send(ctx, key, alerts)
}
sendDuration.Observe(sch.clock.Now().Sub(start).Seconds())
}
retryIfError := func(f func(attempt int64) error) error {
var attempt int64
var err error
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
err = f(attempt)
if err == nil {
return nil
}
}
return err
return nil
}
evalRunning := false
@ -487,7 +499,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
sch.evalApplied(key, ctx.scheduledAt)
}()
err := retryIfError(func(attempt int64) error {
for attempt := int64(1); attempt <= sch.maxAttempts; attempt++ {
isPaused := ctx.rule.IsPaused
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
// Do not clean up state if the eval loop has just started.
@ -506,7 +518,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
currentFingerprint = f
if isPaused {
logger.Debug("Skip rule evaluation because it is paused")
return nil
return
}
fpStr := currentFingerprint.String()
@ -518,15 +530,21 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
attribute.String("rule_fingerprint", fpStr),
attribute.String("tick", utcTick),
))
defer span.End()
evaluate(tracingCtx, f, attempt, ctx, span)
return nil
})
if err != nil {
logger.Error("Evaluation failed after all retries", "error", err)
retry := attempt < sch.maxAttempts
err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
span.End()
if err == nil {
return
}
logger.Error("Failed to evaluate rule", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
time.Sleep(retryDelay)
}
}()
case <-grafanaCtx.Done():
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {

View File

@ -662,6 +662,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, &sender)
sch.maxAttempts = 3
ruleStore.PutRule(context.Background(), rule)
go func() {
@ -682,28 +683,28 @@ func TestSchedule_ruleRoutine(t *testing.T) {
expectedMetric := fmt.Sprintf(
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 3
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 3
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 3
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 1
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 3
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
grafana_alerting_rule_evaluations_total{org="%[1]d"} 3
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1