mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Extract large closures in ruleRoutine (#84035)
* extract notify * extract resetState * move evaluate metrics inside evaluate * split out evaluate
This commit is contained in:
@@ -180,127 +180,11 @@ func (a *alertRuleInfo) stop(reason error) {
|
||||
a.stopFn(reason)
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
|
||||
grafanaCtx := ngmodels.WithRuleKey(a.ctx, key)
|
||||
logger := a.logger.FromContext(grafanaCtx)
|
||||
logger.Debug("Alert rule routine started")
|
||||
|
||||
orgID := fmt.Sprint(key.OrgID)
|
||||
evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID)
|
||||
evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID)
|
||||
evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID)
|
||||
processDuration := a.metrics.ProcessDuration.WithLabelValues(orgID)
|
||||
sendDuration := a.metrics.SendDuration.WithLabelValues(orgID)
|
||||
|
||||
notify := func(states []state.StateTransition) {
|
||||
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
|
||||
if len(expiredAlerts.PostableAlerts) > 0 {
|
||||
a.sender.Send(grafanaCtx, key, expiredAlerts)
|
||||
}
|
||||
}
|
||||
|
||||
resetState := func(ctx context.Context, isPaused bool) {
|
||||
rule := a.ruleProvider.get(key)
|
||||
reason := ngmodels.StateReasonUpdated
|
||||
if isPaused {
|
||||
reason = ngmodels.StateReasonPaused
|
||||
}
|
||||
states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason)
|
||||
notify(states)
|
||||
}
|
||||
|
||||
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
|
||||
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
|
||||
start := a.clock.Now()
|
||||
|
||||
evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), a.newLoadedMetricsReader(e.rule))
|
||||
ruleEval, err := a.evalFactory.Create(evalCtx, e.rule.GetEvalCondition())
|
||||
var results eval.Results
|
||||
var dur time.Duration
|
||||
if err != nil {
|
||||
dur = a.clock.Now().Sub(start)
|
||||
logger.Error("Failed to build rule evaluator", "error", err)
|
||||
} else {
|
||||
results, err = ruleEval.Evaluate(ctx, e.scheduledAt)
|
||||
dur = a.clock.Now().Sub(start)
|
||||
if err != nil {
|
||||
logger.Error("Failed to evaluate rule", "error", err, "duration", dur)
|
||||
}
|
||||
}
|
||||
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(dur.Seconds())
|
||||
|
||||
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
|
||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||
logger.Debug("Skip updating the state because the context has been cancelled")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil || results.HasErrors() {
|
||||
evalTotalFailures.Inc()
|
||||
|
||||
// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
|
||||
if retry {
|
||||
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
|
||||
// This includes transport errors such as transient network errors.
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||
}
|
||||
|
||||
// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
|
||||
if !results.HasNonRetryableErrors() {
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
|
||||
if results == nil {
|
||||
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
|
||||
}
|
||||
|
||||
// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
|
||||
if err == nil {
|
||||
err = results.Error()
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
} else {
|
||||
logger.Debug("Alert rule evaluated", "results", results, "duration", dur)
|
||||
span.AddEvent("rule evaluated", trace.WithAttributes(
|
||||
attribute.Int64("results", int64(len(results))),
|
||||
))
|
||||
}
|
||||
start = a.clock.Now()
|
||||
processedStates := a.stateManager.ProcessEvalResults(
|
||||
ctx,
|
||||
e.scheduledAt,
|
||||
e.rule,
|
||||
results,
|
||||
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder),
|
||||
)
|
||||
processDuration.Observe(a.clock.Now().Sub(start).Seconds())
|
||||
|
||||
start = a.clock.Now()
|
||||
alerts := state.FromStateTransitionToPostableAlerts(processedStates, a.stateManager, a.appURL)
|
||||
span.AddEvent("results processed", trace.WithAttributes(
|
||||
attribute.Int64("state_transitions", int64(len(processedStates))),
|
||||
attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))),
|
||||
))
|
||||
if len(alerts.PostableAlerts) > 0 {
|
||||
a.sender.Send(ctx, key, alerts)
|
||||
}
|
||||
sendDuration.Observe(a.clock.Now().Sub(start).Seconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
evalRunning := false
|
||||
var currentFingerprint fingerprint
|
||||
defer a.stopApplied(key)
|
||||
@@ -315,7 +199,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
|
||||
|
||||
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
|
||||
// clear the state. So the next evaluation will start from the scratch.
|
||||
resetState(grafanaCtx, ctx.IsPaused)
|
||||
a.resetState(grafanaCtx, key, ctx.IsPaused)
|
||||
currentFingerprint = ctx.Fingerprint
|
||||
// evalCh - used by the scheduler to signal that evaluation is needed.
|
||||
case ctx, ok := <-a.evalCh:
|
||||
@@ -348,7 +232,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
|
||||
// lingers in DB and won't be cleaned up until next alert rule update.
|
||||
needReset = needReset || (currentFingerprint == 0 && isPaused)
|
||||
if needReset {
|
||||
resetState(grafanaCtx, isPaused)
|
||||
a.resetState(grafanaCtx, key, isPaused)
|
||||
}
|
||||
currentFingerprint = f
|
||||
if isPaused {
|
||||
@@ -375,7 +259,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
|
||||
}
|
||||
|
||||
retry := attempt < a.maxAttempts
|
||||
err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
|
||||
err := a.evaluate(tracingCtx, key, f, attempt, ctx, span, retry)
|
||||
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
|
||||
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
|
||||
span.End()
|
||||
@@ -403,7 +287,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancelFunc()
|
||||
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
|
||||
notify(states)
|
||||
a.notify(grafanaCtx, key, states)
|
||||
}
|
||||
logger.Debug("Stopping alert rule routine")
|
||||
return nil
|
||||
@@ -411,6 +295,121 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (a *alertRuleInfo) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
|
||||
orgID := fmt.Sprint(key.OrgID)
|
||||
evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID)
|
||||
evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID)
|
||||
evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID)
|
||||
processDuration := a.metrics.ProcessDuration.WithLabelValues(orgID)
|
||||
sendDuration := a.metrics.SendDuration.WithLabelValues(orgID)
|
||||
|
||||
logger := a.logger.FromContext(ctx).New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
|
||||
start := a.clock.Now()
|
||||
|
||||
evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), a.newLoadedMetricsReader(e.rule))
|
||||
ruleEval, err := a.evalFactory.Create(evalCtx, e.rule.GetEvalCondition())
|
||||
var results eval.Results
|
||||
var dur time.Duration
|
||||
if err != nil {
|
||||
dur = a.clock.Now().Sub(start)
|
||||
logger.Error("Failed to build rule evaluator", "error", err)
|
||||
} else {
|
||||
results, err = ruleEval.Evaluate(ctx, e.scheduledAt)
|
||||
dur = a.clock.Now().Sub(start)
|
||||
if err != nil {
|
||||
logger.Error("Failed to evaluate rule", "error", err, "duration", dur)
|
||||
}
|
||||
}
|
||||
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(dur.Seconds())
|
||||
|
||||
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
|
||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||
logger.Debug("Skip updating the state because the context has been cancelled")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil || results.HasErrors() {
|
||||
evalTotalFailures.Inc()
|
||||
|
||||
// Only retry (return errors) if this isn't the last attempt, otherwise skip these return operations.
|
||||
if retry {
|
||||
// The only thing that can return non-nil `err` from ruleEval.Evaluate is the server side expression pipeline.
|
||||
// This includes transport errors such as transient network errors.
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||
}
|
||||
|
||||
// If the pipeline executed successfully but have other types of errors that can be retryable, we should do so.
|
||||
if !results.HasNonRetryableErrors() {
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("the result-set has errors that can be retried: %w", results.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// If results is nil, we assume that the error must be from the SSE pipeline (ruleEval.Evaluate) which is the only code that can actually return an `err`.
|
||||
if results == nil {
|
||||
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
|
||||
}
|
||||
|
||||
// If err is nil, we assume that the SSS pipeline succeeded and that the error must be embedded in the results.
|
||||
if err == nil {
|
||||
err = results.Error()
|
||||
}
|
||||
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
} else {
|
||||
logger.Debug("Alert rule evaluated", "results", results, "duration", dur)
|
||||
span.AddEvent("rule evaluated", trace.WithAttributes(
|
||||
attribute.Int64("results", int64(len(results))),
|
||||
))
|
||||
}
|
||||
start = a.clock.Now()
|
||||
processedStates := a.stateManager.ProcessEvalResults(
|
||||
ctx,
|
||||
e.scheduledAt,
|
||||
e.rule,
|
||||
results,
|
||||
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder),
|
||||
)
|
||||
processDuration.Observe(a.clock.Now().Sub(start).Seconds())
|
||||
|
||||
start = a.clock.Now()
|
||||
alerts := state.FromStateTransitionToPostableAlerts(processedStates, a.stateManager, a.appURL)
|
||||
span.AddEvent("results processed", trace.WithAttributes(
|
||||
attribute.Int64("state_transitions", int64(len(processedStates))),
|
||||
attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))),
|
||||
))
|
||||
if len(alerts.PostableAlerts) > 0 {
|
||||
a.sender.Send(ctx, key, alerts)
|
||||
}
|
||||
sendDuration.Observe(a.clock.Now().Sub(start).Seconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *alertRuleInfo) notify(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) {
|
||||
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
|
||||
if len(expiredAlerts.PostableAlerts) > 0 {
|
||||
a.sender.Send(ctx, key, expiredAlerts)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *alertRuleInfo) resetState(ctx context.Context, key ngmodels.AlertRuleKey, isPaused bool) {
|
||||
rule := a.ruleProvider.get(key)
|
||||
reason := ngmodels.StateReasonUpdated
|
||||
if isPaused {
|
||||
reason = ngmodels.StateReasonPaused
|
||||
}
|
||||
states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason)
|
||||
a.notify(ctx, key, states)
|
||||
}
|
||||
|
||||
// evalApplied is only used on tests.
|
||||
func (a *alertRuleInfo) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) {
|
||||
if a.evalAppliedHook == nil {
|
||||
|
||||
Reference in New Issue
Block a user