diff --git a/pkg/services/ngalert/api/api_testing.go b/pkg/services/ngalert/api/api_testing.go index 11938f928c2..47e2e4f5924 100644 --- a/pkg/services/ngalert/api/api_testing.go +++ b/pkg/services/ngalert/api/api_testing.go @@ -112,6 +112,7 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext, rule, results, state.GetRuleExtraLabels(log.New("testing"), rule, folder.Fullpath, includeFolder), + nil, ) alerts := make([]*amv2.PostableAlert, 0, len(transitions)) diff --git a/pkg/services/ngalert/backtesting/engine.go b/pkg/services/ngalert/backtesting/engine.go index 88685f00134..fa94fbecfb7 100644 --- a/pkg/services/ngalert/backtesting/engine.go +++ b/pkg/services/ngalert/backtesting/engine.go @@ -35,7 +35,7 @@ type backtestingEvaluator interface { } type stateManager interface { - ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *models.AlertRule, results eval.Results, extraLabels data.Labels) []state.StateTransition + ProcessEvalResults(context.Context, time.Time, *models.AlertRule, eval.Results, data.Labels, state.Sender) state.StateTransitions schedule.RuleStateProvider } @@ -97,7 +97,7 @@ func (e *Engine) Test(ctx context.Context, user identity.Requester, rule *models logger.Info("Unexpected evaluation. Skipping", "from", from, "to", to, "interval", rule.IntervalSeconds, "evaluationTime", currentTime, "evaluationIndex", idx, "expectedEvaluations", length) return nil } - states := stateManager.ProcessEvalResults(ruleCtx, currentTime, rule, results, nil) + states := stateManager.ProcessEvalResults(ruleCtx, currentTime, rule, results, nil, nil) tsField.Set(idx, currentTime) for _, s := range states { field, ok := valueFields[s.CacheID] diff --git a/pkg/services/ngalert/backtesting/engine_test.go b/pkg/services/ngalert/backtesting/engine_test.go index d38dd34bc3c..d2685e71535 100644 --- a/pkg/services/ngalert/backtesting/engine_test.go +++ b/pkg/services/ngalert/backtesting/engine_test.go @@ -386,7 +386,7 @@ type fakeStateManager struct { stateCallback func(now time.Time) []state.StateTransition } -func (f *fakeStateManager) ProcessEvalResults(_ context.Context, evaluatedAt time.Time, _ *models.AlertRule, _ eval.Results, _ data.Labels) []state.StateTransition { +func (f *fakeStateManager) ProcessEvalResults(_ context.Context, evaluatedAt time.Time, _ *models.AlertRule, _ eval.Results, _ data.Labels, _ state.Sender) state.StateTransitions { return f.stateCallback(evaluatedAt) } diff --git a/pkg/services/ngalert/schedule/alert_rule.go b/pkg/services/ngalert/schedule/alert_rule.go index f033448f10f..545c8573e73 100644 --- a/pkg/services/ngalert/schedule/alert_rule.go +++ b/pkg/services/ngalert/schedule/alert_rule.go @@ -8,6 +8,7 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/prometheus/alertmanager/api/v2/models" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -16,6 +17,7 @@ import ( "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -324,7 +326,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) defer cancelFunc() states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted) - a.notify(grafanaCtx, key, states) + a.expireAndSend(grafanaCtx, key, states) } logger.Debug("Stopping alert rule routine") return nil @@ -409,30 +411,41 @@ func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f f )) } start = a.clock.Now() - processedStates := a.stateManager.ProcessEvalResults( + _ = a.stateManager.ProcessEvalResults( ctx, e.scheduledAt, e.rule, results, state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder), + func(ctx context.Context, statesToSend state.StateTransitions) { + start := a.clock.Now() + alerts := a.send(ctx, key, statesToSend) + span.AddEvent("results sent", trace.WithAttributes( + attribute.Int64("alerts_sent", int64(len(alerts.PostableAlerts))), + )) + sendDuration.Observe(a.clock.Now().Sub(start).Seconds()) + }, ) processDuration.Observe(a.clock.Now().Sub(start).Seconds()) - start = a.clock.Now() - alerts := state.FromStateTransitionToPostableAlerts(e.scheduledAt, processedStates, a.stateManager, a.appURL) - span.AddEvent("results processed", trace.WithAttributes( - attribute.Int64("state_transitions", int64(len(processedStates))), - attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))), - )) - if len(alerts.PostableAlerts) > 0 { - a.sender.Send(ctx, key, alerts) - } - sendDuration.Observe(a.clock.Now().Sub(start).Seconds()) - return nil } -func (a *alertRule) notify(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) { +// send sends alerts for the given state transitions. +func (a *alertRule) send(ctx context.Context, key ngmodels.AlertRuleKey, states state.StateTransitions) definitions.PostableAlerts { + alerts := definitions.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(states))} + for _, alertState := range states { + alerts.PostableAlerts = append(alerts.PostableAlerts, *state.StateToPostableAlert(alertState, a.appURL)) + } + + if len(alerts.PostableAlerts) > 0 { + a.sender.Send(ctx, key, alerts) + } + return alerts +} + +// sendExpire sends alerts to expire all previously firing alerts in the provided state transitions. +func (a *alertRule) expireAndSend(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) { expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock) if len(expiredAlerts.PostableAlerts) > 0 { a.sender.Send(ctx, key, expiredAlerts) @@ -446,7 +459,7 @@ func (a *alertRule) resetState(ctx context.Context, key ngmodels.AlertRuleKey, i reason = ngmodels.StateReasonPaused } states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason) - a.notify(ctx, key, states) + a.expireAndSend(ctx, key, states) } // evalApplied is only used on tests. diff --git a/pkg/services/ngalert/schedule/alert_rule_test.go b/pkg/services/ngalert/schedule/alert_rule_test.go index 875c098302e..e2d0122a108 100644 --- a/pkg/services/ngalert/schedule/alert_rule_test.go +++ b/pkg/services/ngalert/schedule/alert_rule_test.go @@ -456,7 +456,7 @@ func TestRuleRoutine(t *testing.T) { sch, _, _, _ := createSchedule(make(chan time.Time), nil) rule := gen.GenerateRef() - _ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil) + _ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil, nil) expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) require.NotEmpty(t, expectedStates) @@ -478,7 +478,7 @@ func TestRuleRoutine(t *testing.T) { sch, _, _, _ := createSchedule(make(chan time.Time), nil) rule := gen.GenerateRef() - _ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil) + _ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil, nil) require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) factory := ruleFactoryFromScheduler(sch) diff --git a/pkg/services/ngalert/state/compat.go b/pkg/services/ngalert/state/compat.go index 1a070cfd5ac..55e1fdf76fe 100644 --- a/pkg/services/ngalert/state/compat.go +++ b/pkg/services/ngalert/state/compat.go @@ -6,7 +6,6 @@ import ( "net/url" "path" "strconv" - "time" "github.com/benbjohnson/clock" "github.com/go-openapi/strfmt" @@ -18,7 +17,6 @@ import ( apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/eval" - ngModels "github.com/grafana/grafana/pkg/services/ngalert/models" ) const ( @@ -140,26 +138,6 @@ func errorAlert(labels, annotations data.Labels, alertState *State, urlStr strin } } -func FromStateTransitionToPostableAlerts(evaluatedAt time.Time, firingStates []StateTransition, stateManager *Manager, appURL *url.URL) apimodels.PostableAlerts { - alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))} - - sentAlerts := make([]*State, 0, len(firingStates)) - for _, alertState := range firingStates { - if !alertState.NeedsSending(stateManager.ResendDelay, stateManager.ResolvedRetention) { - continue - } - alert := StateToPostableAlert(alertState, appURL) - alerts.PostableAlerts = append(alerts.PostableAlerts, *alert) - if alertState.StateReason == ngModels.StateReasonMissingSeries { // do not put stale state back to state manager - continue - } - alertState.LastSentAt = &evaluatedAt - sentAlerts = append(sentAlerts, alertState.State) - } - stateManager.Put(sentAlerts) - return alerts -} - // FromAlertsStateToStoppedAlert selects only transitions from firing states (states eval.Alerting, eval.NoData, eval.Error) // and converts them to models.PostableAlert with EndsAt set to time.Now func FromAlertsStateToStoppedAlert(firingStates []StateTransition, appURL *url.URL, clock clock.Clock) apimodels.PostableAlerts { diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index b4a64d4cb32..d1ecc17828e 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -31,9 +31,12 @@ type AlertInstanceManager interface { type StatePersister interface { Async(ctx context.Context, cache *cache) - Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) + Sync(ctx context.Context, span trace.Span, states StateTransitions) } +// Sender is an optional callback intended for sending the states to an alertmanager. +type Sender func(context.Context, StateTransitions) + type Manager struct { log log.Logger metrics *metrics.State @@ -298,9 +301,17 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.Alert // ProcessEvalResults updates the current states that belong to a rule with the evaluation results. // if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels -func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []StateTransition { +// This will update the states in cache/store and return the state transitions that need to be sent to the alertmanager. +func (st *Manager) ProcessEvalResults( + ctx context.Context, + evaluatedAt time.Time, + alertRule *ngModels.AlertRule, + results eval.Results, + extraLabels data.Labels, + send Sender, +) StateTransitions { utcTick := evaluatedAt.UTC().Format(time.RFC3339Nano) - tracingCtx, span := st.tracer.Start(ctx, "alert rule state calculation", trace.WithAttributes( + ctx, span := st.tracer.Start(ctx, "alert rule state calculation", trace.WithAttributes( attribute.String("rule_uid", alertRule.UID), attribute.Int64("org_id", alertRule.OrgID), attribute.Int64("rule_version", alertRule.Version), @@ -308,23 +319,52 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time attribute.Int("results", len(results)))) defer span.End() - logger := st.log.FromContext(tracingCtx) + logger := st.log.FromContext(ctx) logger.Debug("State manager processing evaluation results", "resultCount", len(results)) - states := st.setNextStateForRule(tracingCtx, alertRule, results, extraLabels, logger) - span.AddEvent("results processed", trace.WithAttributes( - attribute.Int64("state_transitions", int64(len(states))), - )) + states := st.setNextStateForRule(ctx, alertRule, results, extraLabels, logger) staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule) - st.persister.Sync(tracingCtx, span, states, staleStates) + span.AddEvent("results processed", trace.WithAttributes( + attribute.Int64("state_transitions", int64(len(states))), + attribute.Int64("stale_states", int64(len(staleStates))), + )) - allChanges := append(states, staleStates...) - if st.historian != nil { - st.historian.Record(tracingCtx, history_model.NewRuleMeta(alertRule, logger), allChanges) + allChanges := StateTransitions(append(states, staleStates...)) + + // It's important that this is done *before* we sync the states to the persister. Otherwise, we will not persist + // the LastSentAt field to the store. + var statesToSend StateTransitions + if send != nil { + statesToSend = st.updateLastSentAt(allChanges, evaluatedAt) } + + st.persister.Sync(ctx, span, allChanges) + if st.historian != nil { + st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges) + } + + // Optional callback intended for sending the states to an alertmanager. + // Some uses ,such as backtesting or the testing api, do not send. + if send != nil { + send(ctx, statesToSend) + } + return allChanges } +// updateLastSentAt returns the subset StateTransitions that need sending and updates their LastSentAt field. +// Note: This is not idempotent, running this twice can (and usually will) return different results. +func (st *Manager) updateLastSentAt(states StateTransitions, evaluatedAt time.Time) StateTransitions { + var result StateTransitions + for _, t := range states { + if t.NeedsSending(st.ResendDelay, st.ResolvedRetention) { + t.LastSentAt = &evaluatedAt + result = append(result, t) + } + } + return result +} + func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels, logger log.Logger) []StateTransition { if st.applyNoDataAndErrorToAllStates && results.IsNoData() && (alertRule.NoDataState == ngModels.Alerting || alertRule.NoDataState == ngModels.OK || alertRule.NoDataState == ngModels.KeepLast) { // If it is no data, check the mapping and switch all results to the new state // TODO aggregate UID of datasources that returned NoData into one and provide as auxiliary info, probably annotation diff --git a/pkg/services/ngalert/state/manager_bench_test.go b/pkg/services/ngalert/state/manager_bench_test.go index 4499b884e01..df9bed60699 100644 --- a/pkg/services/ngalert/state/manager_bench_test.go +++ b/pkg/services/ngalert/state/manager_bench_test.go @@ -39,7 +39,7 @@ func BenchmarkProcessEvalResults(b *testing.B) { var ans []state.StateTransition for i := 0; i < b.N; i++ { - ans = sut.ProcessEvalResults(context.Background(), now, &rule, results, labels) + ans = sut.ProcessEvalResults(context.Background(), now, &rule, results, labels, nil) } b.StopTimer() diff --git a/pkg/services/ngalert/state/manager_private_test.go b/pkg/services/ngalert/state/manager_private_test.go index 322f649bdda..affd34fd299 100644 --- a/pkg/services/ngalert/state/manager_private_test.go +++ b/pkg/services/ngalert/state/manager_private_test.go @@ -120,7 +120,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { evaluationInterval := 10 * time.Second tN := func(n int) time.Time { - return time.Time{}.Add(time.Duration(n) * evaluationInterval) + return time.Unix(0, 0).UTC().Add(time.Duration(n) * evaluationInterval) } t1 := tN(1) t2 := tN(2) @@ -270,7 +270,15 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { for _, ts := range tss { results := resultsAtTime[ts] clk.Set(ts) - actual := st.ProcessEvalResults(context.Background(), ts, alertRule, results, systemLabels) + var statesToSend StateTransitions + actual := st.ProcessEvalResults(context.Background(), ts, alertRule, results, systemLabels, func(_ context.Context, states StateTransitions) { + statesToSend = states + }) + + // Expect all statesToSend to have a LastSentAt set to the evaluation time. + for _, state := range statesToSend { + assert.Equalf(t, ts, *state.LastSentAt, "LastSentAt should be set to the evaluation time for all ready-to-send transitions.") + } expectedTransitions, ok := expectedTransitionsAtTime[ts] if !ok { // skip if nothing to assert @@ -367,6 +375,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, }, }, { @@ -443,6 +452,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, { @@ -510,6 +520,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -564,6 +575,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t2, LastEvaluationTime: t2, ResolvedAt: &t2, + LastSentAt: &t2, }, }, }, @@ -623,6 +635,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -682,6 +695,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, }, }, }, @@ -733,6 +747,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -819,6 +834,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, }, }, }, @@ -835,6 +851,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, }, }, }, @@ -895,6 +912,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -911,6 +929,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -961,6 +980,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -1025,6 +1045,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -1052,6 +1073,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -1063,6 +1085,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -1092,6 +1115,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -1105,6 +1129,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -1134,6 +1159,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -1176,6 +1202,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -1207,6 +1234,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, { @@ -1219,6 +1247,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t1, }, }, }, @@ -1234,6 +1263,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, { @@ -1247,6 +1277,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t1, }, }, }, @@ -1276,6 +1307,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t2, LastEvaluationTime: t2, ResolvedAt: &t2, + LastSentAt: &t2, }, }, }, @@ -1305,6 +1337,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t2, LastEvaluationTime: t3, ResolvedAt: &t2, + LastSentAt: &t2, }, }, }, @@ -1333,6 +1366,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t1, }, }, }, @@ -1361,6 +1395,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t1, }, }, }, @@ -1394,6 +1429,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -1431,6 +1467,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -1486,6 +1523,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1598,6 +1636,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -1613,6 +1652,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, { @@ -1626,6 +1666,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -1710,6 +1751,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -1738,6 +1780,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -1770,6 +1813,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1785,6 +1829,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1800,6 +1845,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1815,6 +1861,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1833,6 +1880,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1865,6 +1913,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -1908,6 +1957,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t3, LastEvaluationTime: t3, + LastSentAt: &t1, // We don't bother updating LastSentAt for StateReasonMissingSeries since it's deleted from state. }, }, }, @@ -1937,6 +1987,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, }, @@ -2021,6 +2072,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2037,6 +2089,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2087,6 +2140,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2150,6 +2204,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2165,6 +2220,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -2176,6 +2232,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -2193,6 +2250,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -2206,6 +2264,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -2223,6 +2282,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -2253,6 +2313,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, { @@ -2284,6 +2345,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t1, }, }, }, @@ -2299,6 +2361,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t1, }, }, }, @@ -2316,6 +2379,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t2, LastEvaluationTime: t2, ResolvedAt: &t2, + LastSentAt: &t2, }, }, }, @@ -2332,6 +2396,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t2, LastEvaluationTime: t3, ResolvedAt: &t2, + LastSentAt: &t2, }, }, }, @@ -2348,6 +2413,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t1, }, }, }, @@ -2363,6 +2429,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t1, }, }, }, @@ -2395,6 +2462,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2408,6 +2476,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -2423,6 +2492,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -2438,6 +2508,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -2453,6 +2524,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -2471,6 +2543,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -2503,6 +2576,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -2588,6 +2662,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -2608,6 +2683,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, }, }, }, @@ -2666,6 +2742,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": genericError.Error(), }), @@ -2686,6 +2763,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t1.Add(ResendDelay * 4), LastEvaluationTime: t1, + LastSentAt: &t1, }, }, }, @@ -2749,6 +2827,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -2820,6 +2899,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2852,6 +2932,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2882,6 +2963,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -2902,6 +2984,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -2953,6 +3036,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -3030,6 +3114,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t1, EndsAt: t3, LastEvaluationTime: t3, + LastSentAt: &t1, // We don't bother updating LastSentAt for StateReasonMissingSeries since it's deleted from state. Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -3063,6 +3148,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t3, LastEvaluationTime: t3, ResolvedAt: &t3, + LastSentAt: &t3, }, }, }, @@ -3149,6 +3235,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -3169,6 +3256,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -3245,6 +3333,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -3265,6 +3354,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -3297,6 +3387,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, }, }, }, @@ -3331,6 +3422,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2.Add(ResendDelay * 4), LastEvaluationTime: t2, + LastSentAt: &t2, Annotations: mergeLabels(baseRule.Annotations, data.Labels{ "Error": datasourceError.Error(), }), @@ -3347,6 +3439,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t2, }, }, }, @@ -3378,6 +3471,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -3438,6 +3532,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t3, EndsAt: t3.Add(ResendDelay * 4), LastEvaluationTime: t3, + LastSentAt: &t3, }, }, }, @@ -3466,6 +3561,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { StartsAt: t2, EndsAt: t2, LastEvaluationTime: t2, + LastSentAt: &t1, // TODO: Fix me. This should be t2 since we should be resolving the previous DatasourceError alert. }, }, }, @@ -3483,6 +3579,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { EndsAt: t2, LastEvaluationTime: t2, ResolvedAt: &t2, + LastSentAt: &t2, }, }, }, diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index 50954c2dd92..e008edc811e 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -7,7 +7,6 @@ import ( "fmt" "math" "math/rand" - "net/url" "sort" "strings" "testing" @@ -269,7 +268,7 @@ func TestDashboardAnnotations(t *testing.T) { }, }}, data.Labels{ "alertname": rule.Title, - }) + }, nil) expected := []string{rule.Title + " {alertname=" + rule.Title + ", instance_label=testValue2, test1=testValue1, test2=testValue2} - B=42.000000, C=1.000000"} sort.Strings(expected) @@ -1415,9 +1414,8 @@ func TestProcessEvalResults(t *testing.T) { res[i].EvaluatedAt = evalTime } clk.Set(evalTime) - processedStates := st.ProcessEvalResults(context.Background(), evalTime, tc.alertRule, res, systemLabels) + _ = st.ProcessEvalResults(context.Background(), evalTime, tc.alertRule, res, systemLabels, state.NoopSender) results += len(res) - _ = state.FromStateTransitionToPostableAlerts(evalTime, processedStates, st, &url.URL{}) // Set LastSentAt. } states := st.GetStatesForRuleUID(tc.alertRule.OrgID, tc.alertRule.UID) @@ -1506,8 +1504,7 @@ func TestProcessEvalResults(t *testing.T) { rule := models.RuleGen.GenerateRef() var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))) - states := st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, make(data.Labels)) - + states := st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, make(data.Labels), nil) require.NotEmpty(t, states) savedStates := make(map[data.Fingerprint]models.AlertInstance) @@ -1668,7 +1665,7 @@ func TestStaleResultsHandler(t *testing.T) { "alertname": rule.Title, "__alert_rule_namespace_uid__": rule.NamespaceUID, "__alert_rule_uid__": rule.UID, - }) + }, nil) for _, s := range tc.expectedStates { setCacheID(s) cachedState := st.Get(s.OrgID, s.AlertRuleUID, s.CacheID) @@ -1737,7 +1734,7 @@ func TestStaleResults(t *testing.T) { rule := gen.With(gen.WithFor(0)).GenerateRef() initResults := eval.Results{ - eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))(), + eval.ResultGen(eval.WithState(eval.Alerting), eval.WithEvaluatedAt(clk.Now()))(), eval.ResultGen(eval.WithState(eval.Alerting), eval.WithEvaluatedAt(clk.Now()))(), eval.ResultGen(eval.WithState(eval.Normal), eval.WithEvaluatedAt(clk.Now()))(), } @@ -1753,9 +1750,15 @@ func TestStaleResults(t *testing.T) { } // Init - processed := st.ProcessEvalResults(ctx, clk.Now(), rule, initResults, nil) + var statesToSend state.StateTransitions + processed := st.ProcessEvalResults(ctx, clk.Now(), rule, initResults, nil, func(_ context.Context, states state.StateTransitions) { + statesToSend = states + }) checkExpectedStateTransitions(t, processed, initStates) + // Check that it returns just those state transitions that needs to be sent. + checkExpectedStateTransitions(t, statesToSend, map[data.Fingerprint]struct{}{state1: {}, state2: {}}) // Does not contain the Normal state3. + currentStates := st.GetStatesForRuleUID(rule.OrgID, rule.UID) statesMap := checkExpectedStates(t, currentStates, initStates) require.Equal(t, eval.Alerting, statesMap[state2].State) // make sure the state is alerting because we need it to be resolved later @@ -1770,7 +1773,7 @@ func TestStaleResults(t *testing.T) { var expectedStaleKeys []models.AlertInstanceKey t.Run("should mark missing states as stale", func(t *testing.T) { - processed = st.ProcessEvalResults(ctx, clk.Now(), rule, results, nil) + processed = st.ProcessEvalResults(ctx, clk.Now(), rule, results, nil, nil) checkExpectedStateTransitions(t, processed, initStates) for _, s := range processed { if s.CacheID == state1 { diff --git a/pkg/services/ngalert/state/persister_async.go b/pkg/services/ngalert/state/persister_async.go index a0118b92df8..91807f26921 100644 --- a/pkg/services/ngalert/state/persister_async.go +++ b/pkg/services/ngalert/state/persister_async.go @@ -64,6 +64,6 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error return nil } -func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) { +func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) { a.log.Debug("Sync: No-Op") } diff --git a/pkg/services/ngalert/state/persister_noop.go b/pkg/services/ngalert/state/persister_noop.go index a500f34eead..0275bc5f351 100644 --- a/pkg/services/ngalert/state/persister_noop.go +++ b/pkg/services/ngalert/state/persister_noop.go @@ -8,8 +8,8 @@ import ( type NoopPersister struct{} -func (n *NoopPersister) Async(_ context.Context, _ *cache) {} -func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {} +func (n *NoopPersister) Async(_ context.Context, _ *cache) {} +func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) {} func NewNoopPersister() StatePersister { return &NoopPersister{} diff --git a/pkg/services/ngalert/state/persister_sync.go b/pkg/services/ngalert/state/persister_sync.go index 90ddd350498..5a53d6ddd7d 100644 --- a/pkg/services/ngalert/state/persister_sync.go +++ b/pkg/services/ngalert/state/persister_sync.go @@ -33,15 +33,18 @@ func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister { func (a *SyncStatePersister) Async(_ context.Context, _ *cache) { a.log.Debug("Async: No-Op") } -func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) { - a.deleteAlertStates(ctx, staleStates) + +// Sync persists the state transitions to the database. It deletes stale states and saves the current states. +func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, allStates StateTransitions) { + staleStates := allStates.StaleStates() if len(staleStates) > 0 { + a.deleteAlertStates(ctx, staleStates) span.AddEvent("deleted stale states", trace.WithAttributes( attribute.Int64("state_transitions", int64(len(staleStates))), )) } - a.saveAlertStates(ctx, states...) + a.saveAlertStates(ctx, allStates...) span.AddEvent("updated database") } @@ -75,6 +78,12 @@ func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...Stat saveState := func(ctx context.Context, idx int) error { s := states[idx] + + // Do not save stale state to database. + if s.IsStale() { + return nil + } + // Do not save normal state to database and remove transition to Normal state but keep mapped states if a.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() { return nil diff --git a/pkg/services/ngalert/state/persister_sync_test.go b/pkg/services/ngalert/state/persister_sync_test.go index 4465cebdb1e..58d63c9cc72 100644 --- a/pkg/services/ngalert/state/persister_sync_test.go +++ b/pkg/services/ngalert/state/persister_sync_test.go @@ -65,7 +65,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) { InstanceStore: st, MaxStateSaveConcurrency: 1, }) - syncStatePersister.Sync(context.Background(), span, transitions, nil) + syncStatePersister.Sync(context.Background(), span, transitions) savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} for _, op := range st.RecordedOps() { saved := op.(ngmodels.AlertInstance) @@ -86,7 +86,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) { InstanceStore: st, MaxStateSaveConcurrency: 1, }) - syncStatePersister.Sync(context.Background(), span, transitions, nil) + syncStatePersister.Sync(context.Background(), span, transitions) savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{} for _, op := range st.RecordedOps() { diff --git a/pkg/services/ngalert/state/state.go b/pkg/services/ngalert/state/state.go index ea48dd2f616..8bdc883f0af 100644 --- a/pkg/services/ngalert/state/state.go +++ b/pkg/services/ngalert/state/state.go @@ -187,6 +187,19 @@ func (c StateTransition) Changed() bool { return c.PreviousState != c.State.State || c.PreviousStateReason != c.State.StateReason } +type StateTransitions []StateTransition + +// StaleStates returns the subset of StateTransitions that are stale. +func (c StateTransitions) StaleStates() StateTransitions { + var result StateTransitions + for _, t := range c { + if t.IsStale() { + result = append(result, t) + } + } + return result +} + type Evaluation struct { EvaluationTime time.Time EvaluationState eval.State @@ -484,6 +497,11 @@ func (a *State) GetLastEvaluationValuesForCondition() map[string]float64 { return r } +// IsStale returns true if the state is stale, meaning that the state is ready to be evicted from the cache. +func (a *State) IsStale() bool { + return a.StateReason == models.StateReasonMissingSeries +} + // shouldTakeImage returns true if the state just has transitioned to alerting from another state, // transitioned to alerting in a previous evaluation but does not have a screenshot, or has just // been resolved. diff --git a/pkg/services/ngalert/state/testing.go b/pkg/services/ngalert/state/testing.go index 07c05aa6b35..6ec84d0df8f 100644 --- a/pkg/services/ngalert/state/testing.go +++ b/pkg/services/ngalert/state/testing.go @@ -100,3 +100,6 @@ type NoopImageService struct{} func (s *NoopImageService) NewImage(_ context.Context, _ *models.AlertRule) (*models.Image, error) { return &models.Image{}, nil } + +// NoopSender is a no-op sender. Used when you want state manager to update LastSentAt without sending any alerts. +var NoopSender = func(_ context.Context, _ StateTransitions) {}