diff --git a/pkg/services/ngalert/schedule/alert_rule.go b/pkg/services/ngalert/schedule/alert_rule.go index 396d45bd498..88642aaf157 100644 --- a/pkg/services/ngalert/schedule/alert_rule.go +++ b/pkg/services/ngalert/schedule/alert_rule.go @@ -180,127 +180,11 @@ func (a *alertRuleInfo) stop(reason error) { a.stopFn(reason) } -//nolint:gocyclo func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { grafanaCtx := ngmodels.WithRuleKey(a.ctx, key) logger := a.logger.FromContext(grafanaCtx) logger.Debug("Alert rule routine started") - orgID := fmt.Sprint(key.OrgID) - evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID) - evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID) - evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID) - processDuration := a.metrics.ProcessDuration.WithLabelValues(orgID) - sendDuration := a.metrics.SendDuration.WithLabelValues(orgID) - - notify := func(states []state.StateTransition) { - expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock) - if len(expiredAlerts.PostableAlerts) > 0 { - a.sender.Send(grafanaCtx, key, expiredAlerts) - } - } - - resetState := func(ctx context.Context, isPaused bool) { - rule := a.ruleProvider.get(key) - reason := ngmodels.StateReasonUpdated - if isPaused { - reason = ngmodels.StateReasonPaused - } - states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason) - notify(states) - } - - evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error { - logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx) - start := a.clock.Now() - - evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), a.newLoadedMetricsReader(e.rule)) - ruleEval, err := a.evalFactory.Create(evalCtx, e.rule.GetEvalCondition()) - var results eval.Results - var dur time.Duration - if err != nil { - dur = a.clock.Now().Sub(start) - logger.Error("Failed to build rule evaluator", "error", err) - } else { - results, err = ruleEval.Evaluate(ctx, e.scheduledAt) - dur = a.clock.Now().Sub(start) - if err != nil { - logger.Error("Failed to evaluate rule", "error", err, "duration", dur) - } - } - - evalTotal.Inc() - evalDuration.Observe(dur.Seconds()) - - if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task. - span.SetStatus(codes.Error, "rule evaluation cancelled") - logger.Debug("Skip updating the state because the context has been cancelled") - return nil - } - - if err != nil || results.HasErrors() { - evalTotalFailures.Inc() - - // Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations. - if retry { - // The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline. - // This includes transport errors such as transient network errors. - if err != nil { - span.SetStatus(codes.Error, "rule evaluation failed") - span.RecordError(err) - return fmt.Errorf("server side expressions pipeline returned an error: %w", err) - } - - // If the pipeline executed successfully but have other types of errors that can be retryable, we should do so. - if !results.HasNonRetryableErrors() { - span.SetStatus(codes.Error, "rule evaluation failed") - span.RecordError(err) - return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error()) - } - } - - // If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`. - if results == nil { - results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur)) - } - - // If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results. - if err == nil { - err = results.Error() - } - - span.SetStatus(codes.Error, "rule evaluation failed") - span.RecordError(err) - } else { - logger.Debug("Alert rule evaluated", "results", results, "duration", dur) - span.AddEvent("rule evaluated", trace.WithAttributes( - attribute.Int64("results", int64(len(results))), - )) - } - start = a.clock.Now() - processedStates := a.stateManager.ProcessEvalResults( - ctx, - e.scheduledAt, - e.rule, - results, - state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder), - ) - processDuration.Observe(a.clock.Now().Sub(start).Seconds()) - - start = a.clock.Now() - alerts := state.FromStateTransitionToPostableAlerts(processedStates, a.stateManager, a.appURL) - span.AddEvent("results processed", trace.WithAttributes( - attribute.Int64("state_transitions", int64(len(processedStates))), - attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))), - )) - if len(alerts.PostableAlerts) > 0 { - a.sender.Send(ctx, key, alerts) - } - sendDuration.Observe(a.clock.Now().Sub(start).Seconds()) - - return nil - } - evalRunning := false var currentFingerprint fingerprint defer a.stopApplied(key) @@ -315,7 +199,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint) // clear the state. So the next evaluation will start from the scratch. - resetState(grafanaCtx, ctx.IsPaused) + a.resetState(grafanaCtx, key, ctx.IsPaused) currentFingerprint = ctx.Fingerprint // evalCh - used by the scheduler to signal that evaluation is needed. case ctx, ok := <-a.evalCh: @@ -348,7 +232,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { // lingers in DB and won't be cleaned up until next alert rule update. needReset = needReset || (currentFingerprint == 0 && isPaused) if needReset { - resetState(grafanaCtx, isPaused) + a.resetState(grafanaCtx, key, isPaused) } currentFingerprint = f if isPaused { @@ -375,7 +259,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { } retry := attempt < a.maxAttempts - err := evaluate(tracingCtx, f, attempt, ctx, span, retry) + err := a.evaluate(tracingCtx, key, f, attempt, ctx, span, retry) // This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors // we return nil - so technically, this is meaningless to know whether the evaluation has errors or not. span.End() @@ -403,7 +287,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) defer cancelFunc() states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted) - notify(states) + a.notify(grafanaCtx, key, states) } logger.Debug("Stopping alert rule routine") return nil @@ -411,6 +295,121 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { } } +func (a *alertRuleInfo) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error { + orgID := fmt.Sprint(key.OrgID) + evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID) + evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID) + evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID) + processDuration := a.metrics.ProcessDuration.WithLabelValues(orgID) + sendDuration := a.metrics.SendDuration.WithLabelValues(orgID) + + logger := a.logger.FromContext(ctx).New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx) + start := a.clock.Now() + + evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), a.newLoadedMetricsReader(e.rule)) + ruleEval, err := a.evalFactory.Create(evalCtx, e.rule.GetEvalCondition()) + var results eval.Results + var dur time.Duration + if err != nil { + dur = a.clock.Now().Sub(start) + logger.Error("Failed to build rule evaluator", "error", err) + } else { + results, err = ruleEval.Evaluate(ctx, e.scheduledAt) + dur = a.clock.Now().Sub(start) + if err != nil { + logger.Error("Failed to evaluate rule", "error", err, "duration", dur) + } + } + + evalTotal.Inc() + evalDuration.Observe(dur.Seconds()) + + if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task. + span.SetStatus(codes.Error, "rule evaluation cancelled") + logger.Debug("Skip updating the state because the context has been cancelled") + return nil + } + + if err != nil || results.HasErrors() { + evalTotalFailures.Inc() + + // Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations. + if retry { + // The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline. + // This includes transport errors such as transient network errors. + if err != nil { + span.SetStatus(codes.Error, "rule evaluation failed") + span.RecordError(err) + return fmt.Errorf("server side expressions pipeline returned an error: %w", err) + } + + // If the pipeline executed successfully but have other types of errors that can be retryable, we should do so. + if !results.HasNonRetryableErrors() { + span.SetStatus(codes.Error, "rule evaluation failed") + span.RecordError(err) + return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error()) + } + } + + // If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`. + if results == nil { + results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur)) + } + + // If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results. + if err == nil { + err = results.Error() + } + + span.SetStatus(codes.Error, "rule evaluation failed") + span.RecordError(err) + } else { + logger.Debug("Alert rule evaluated", "results", results, "duration", dur) + span.AddEvent("rule evaluated", trace.WithAttributes( + attribute.Int64("results", int64(len(results))), + )) + } + start = a.clock.Now() + processedStates := a.stateManager.ProcessEvalResults( + ctx, + e.scheduledAt, + e.rule, + results, + state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder), + ) + processDuration.Observe(a.clock.Now().Sub(start).Seconds()) + + start = a.clock.Now() + alerts := state.FromStateTransitionToPostableAlerts(processedStates, a.stateManager, a.appURL) + span.AddEvent("results processed", trace.WithAttributes( + attribute.Int64("state_transitions", int64(len(processedStates))), + attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))), + )) + if len(alerts.PostableAlerts) > 0 { + a.sender.Send(ctx, key, alerts) + } + sendDuration.Observe(a.clock.Now().Sub(start).Seconds()) + + return nil +} + +func (a *alertRuleInfo) notify(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) { + expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock) + if len(expiredAlerts.PostableAlerts) > 0 { + a.sender.Send(ctx, key, expiredAlerts) + } +} + +func (a *alertRuleInfo) resetState(ctx context.Context, key ngmodels.AlertRuleKey, isPaused bool) { + rule := a.ruleProvider.get(key) + reason := ngmodels.StateReasonUpdated + if isPaused { + reason = ngmodels.StateReasonPaused + } + states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason) + a.notify(ctx, key, states) +} + // evalApplied is only used on tests. func (a *alertRuleInfo) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) { if a.evalAppliedHook == nil {