mirror of
https://github.com/grafana/grafana.git
synced 2025-01-27 16:57:14 -06:00
Alerting: Attempt to retry retryable errors (#79161)
* Alerting: Attempt to retry retryable errors Retrying has been broken for a good while now (at least since version 9.4) - this change attempts to re-introduce them in their simplest and safest form possible. I first introduced #79095 to make sure we don't disrupt or put additional load on our customer's data sources with this change in a patch release. Paired with this change, retries can now work as expected. There's two small differences between how retries work now and how they used to work in legacy alerting. Retries only occur for valid alert definitions - if we suspect that that error comes from a malformed alert definition we skip retrying. We have added a constant backoff of 1s in between retries. --------- Signed-off-by: gotjosh <josue.abreu@gmail.com>
This commit is contained in:
parent
ea36336c0a
commit
c631261681
@ -157,6 +157,23 @@ func (evalResults Results) HasErrors() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// HasNonRetryableErrors returns true if we have at least 1 result with:
|
||||
// 1. A `State` of `Error`
|
||||
// 2. The `Error` attribute is not nil
|
||||
// 3. The `Error` type is of `&invalidEvalResultFormatError`
|
||||
// Our thinking with this approach, is that we don't want to retry errors that have relation with invalid alert definition format.
|
||||
func (evalResults Results) HasNonRetryableErrors() bool {
|
||||
for _, r := range evalResults {
|
||||
if r.State == Error && r.Error != nil {
|
||||
var nonRetryableError *invalidEvalResultFormatError
|
||||
if errors.As(r.Error, &nonRetryableError) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// HasErrors returns true when Results contains at least one element and all elements are errors
|
||||
func (evalResults Results) IsError() bool {
|
||||
for _, r := range evalResults {
|
||||
@ -177,6 +194,18 @@ func (evalResults Results) IsNoData() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Error returns the aggregated `error` of all results of which state is `Error`.
|
||||
func (evalResults Results) Error() error {
|
||||
var errs []error
|
||||
for _, result := range evalResults {
|
||||
if result.State == Error && result.Error != nil {
|
||||
errs = append(errs, result.Error)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
// Result contains the evaluated State of an alert instance
|
||||
// identified by its labels.
|
||||
type Result struct {
|
||||
|
@ -2,6 +2,7 @@ package eval
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
@ -769,6 +770,70 @@ func TestEvaluateRaw(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestResults_HasNonRetryableErrors(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
eval Results
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "with non-retryable errors",
|
||||
eval: Results{
|
||||
{
|
||||
State: Error,
|
||||
Error: &invalidEvalResultFormatError{refID: "A", reason: "unable to get frame row length", err: errors.New("weird error")},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "with retryable errors",
|
||||
eval: Results{
|
||||
{
|
||||
State: Error,
|
||||
Error: errors.New("some weird error"),
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require.Equal(t, tt.expected, tt.eval.HasNonRetryableErrors())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResults_Error(t *testing.T) {
|
||||
tc := []struct {
|
||||
name string
|
||||
eval Results
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "with non-retryable errors",
|
||||
eval: Results{
|
||||
{
|
||||
State: Error,
|
||||
Error: &invalidEvalResultFormatError{refID: "A", reason: "unable to get frame row length", err: errors.New("weird error")},
|
||||
},
|
||||
{
|
||||
State: Error,
|
||||
Error: errors.New("unable to get a data frame"),
|
||||
},
|
||||
},
|
||||
expected: "invalid format of evaluation results for the alert definition A: unable to get frame row length: weird error\nunable to get a data frame",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tc {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require.Equal(t, tt.expected, tt.eval.Error().Error())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeExpressionService struct {
|
||||
hook func(ctx context.Context, now time.Time, pipeline expr.DataPipeline) (*backend.QueryDataResponse, error)
|
||||
}
|
||||
|
@ -34,6 +34,9 @@ type ScheduleService interface {
|
||||
Run(context.Context) error
|
||||
}
|
||||
|
||||
// retryDelay represents how long to wait between each failed rule evaluation.
|
||||
const retryDelay = 1 * time.Second
|
||||
|
||||
// AlertsSender is an interface for a service that is responsible for sending notifications to the end-user.
|
||||
//
|
||||
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
|
||||
@ -345,6 +348,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
||||
return readyToRun, registeredDefinitions, updatedRules
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error {
|
||||
grafanaCtx = ngmodels.WithRuleKey(grafanaCtx, key)
|
||||
logger := sch.log.FromContext(grafanaCtx)
|
||||
@ -374,7 +378,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
notify(states)
|
||||
}
|
||||
|
||||
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span) {
|
||||
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
|
||||
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
|
||||
start := sch.clock.Now()
|
||||
|
||||
@ -396,18 +400,43 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(dur.Seconds())
|
||||
|
||||
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
|
||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||
logger.Debug("Skip updating the state because the context has been cancelled")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil || results.HasErrors() {
|
||||
evalTotalFailures.Inc()
|
||||
|
||||
// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
|
||||
if retry {
|
||||
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
|
||||
// This includes transport errors such as transient network errors.
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||
}
|
||||
|
||||
// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
|
||||
if !results.HasNonRetryableErrors() {
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
|
||||
if results == nil {
|
||||
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
|
||||
}
|
||||
|
||||
// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
|
||||
if err == nil {
|
||||
for _, result := range results {
|
||||
if result.Error != nil {
|
||||
err = errors.Join(err, result.Error)
|
||||
}
|
||||
}
|
||||
err = results.Error()
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
} else {
|
||||
@ -416,10 +445,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
attribute.Int64("results", int64(len(results))),
|
||||
))
|
||||
}
|
||||
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
|
||||
logger.Debug("Skip updating the state because the context has been cancelled")
|
||||
return
|
||||
}
|
||||
start = sch.clock.Now()
|
||||
processedStates := sch.stateManager.ProcessEvalResults(
|
||||
ctx,
|
||||
@ -440,18 +465,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
sch.alertsSender.Send(ctx, key, alerts)
|
||||
}
|
||||
sendDuration.Observe(sch.clock.Now().Sub(start).Seconds())
|
||||
}
|
||||
|
||||
retryIfError := func(f func(attempt int64) error) error {
|
||||
var attempt int64
|
||||
var err error
|
||||
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
|
||||
err = f(attempt)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
evalRunning := false
|
||||
@ -487,7 +502,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
sch.evalApplied(key, ctx.scheduledAt)
|
||||
}()
|
||||
|
||||
err := retryIfError(func(attempt int64) error {
|
||||
for attempt := int64(1); attempt <= sch.maxAttempts; attempt++ {
|
||||
isPaused := ctx.rule.IsPaused
|
||||
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
|
||||
// Do not clean up state if the eval loop has just started.
|
||||
@ -506,7 +521,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
currentFingerprint = f
|
||||
if isPaused {
|
||||
logger.Debug("Skip rule evaluation because it is paused")
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
fpStr := currentFingerprint.String()
|
||||
@ -518,15 +533,35 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
attribute.String("rule_fingerprint", fpStr),
|
||||
attribute.String("tick", utcTick),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
evaluate(tracingCtx, f, attempt, ctx, span)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("Evaluation failed after all retries", "error", err)
|
||||
// Check before any execution if the context was cancelled so that we don't do any evaluations.
|
||||
if tracingCtx.Err() != nil {
|
||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||
span.End()
|
||||
logger.Error("Skip evaluation and updating the state because the context has been cancelled", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||
return
|
||||
}
|
||||
|
||||
retry := attempt < sch.maxAttempts
|
||||
err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
|
||||
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
|
||||
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
|
||||
span.End()
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Error("Failed to evaluate rule", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||
select {
|
||||
case <-tracingCtx.Done():
|
||||
logger.Error("Context has been cancelled while backing off", "version", ctx.rule.Version, "fingerprint", f, "attempt", attempt, "now", ctx.scheduledAt)
|
||||
return
|
||||
case <-time.After(retryDelay):
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
case <-grafanaCtx.Done():
|
||||
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
|
||||
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
|
||||
|
@ -662,6 +662,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, &sender)
|
||||
sch.maxAttempts = 3
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
|
||||
go func() {
|
||||
@ -682,28 +683,28 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
expectedMetric := fmt.Sprintf(
|
||||
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
||||
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 3
|
||||
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 3
|
||||
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
|
||||
# TYPE grafana_alerting_rule_evaluation_failures_total counter
|
||||
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 1
|
||||
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 3
|
||||
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
||||
# TYPE grafana_alerting_rule_evaluations_total counter
|
||||
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
|
||||
grafana_alerting_rule_evaluations_total{org="%[1]d"} 3
|
||||
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule.
|
||||
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
|
||||
grafana_alerting_rule_process_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||
|
Loading…
Reference in New Issue
Block a user