Alerting: Ensure we update State.LastSentAt before persisting (#89427)

This commit is contained in:
Matthew Jacobson 2024-06-25 13:01:26 -04:00 committed by GitHub
parent 0f01db4025
commit 47c9259d75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 237 additions and 75 deletions

View File

@ -112,6 +112,7 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext,
rule,
results,
state.GetRuleExtraLabels(log.New("testing"), rule, folder.Fullpath, includeFolder),
nil,
)
alerts := make([]*amv2.PostableAlert, 0, len(transitions))

View File

@ -35,7 +35,7 @@ type backtestingEvaluator interface {
}
type stateManager interface {
ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *models.AlertRule, results eval.Results, extraLabels data.Labels) []state.StateTransition
ProcessEvalResults(context.Context, time.Time, *models.AlertRule, eval.Results, data.Labels, state.Sender) state.StateTransitions
schedule.RuleStateProvider
}
@ -97,7 +97,7 @@ func (e *Engine) Test(ctx context.Context, user identity.Requester, rule *models
logger.Info("Unexpected evaluation. Skipping", "from", from, "to", to, "interval", rule.IntervalSeconds, "evaluationTime", currentTime, "evaluationIndex", idx, "expectedEvaluations", length)
return nil
}
states := stateManager.ProcessEvalResults(ruleCtx, currentTime, rule, results, nil)
states := stateManager.ProcessEvalResults(ruleCtx, currentTime, rule, results, nil, nil)
tsField.Set(idx, currentTime)
for _, s := range states {
field, ok := valueFields[s.CacheID]

View File

@ -386,7 +386,7 @@ type fakeStateManager struct {
stateCallback func(now time.Time) []state.StateTransition
}
func (f *fakeStateManager) ProcessEvalResults(_ context.Context, evaluatedAt time.Time, _ *models.AlertRule, _ eval.Results, _ data.Labels) []state.StateTransition {
func (f *fakeStateManager) ProcessEvalResults(_ context.Context, evaluatedAt time.Time, _ *models.AlertRule, _ eval.Results, _ data.Labels, _ state.Sender) state.StateTransitions {
return f.stateCallback(evaluatedAt)
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/benbjohnson/clock"
"github.com/prometheus/alertmanager/api/v2/models"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
@ -16,6 +17,7 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
@ -324,7 +326,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
a.notify(grafanaCtx, key, states)
a.expireAndSend(grafanaCtx, key, states)
}
logger.Debug("Stopping alert rule routine")
return nil
@ -409,30 +411,41 @@ func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f f
))
}
start = a.clock.Now()
processedStates := a.stateManager.ProcessEvalResults(
_ = a.stateManager.ProcessEvalResults(
ctx,
e.scheduledAt,
e.rule,
results,
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder),
func(ctx context.Context, statesToSend state.StateTransitions) {
start := a.clock.Now()
alerts := a.send(ctx, key, statesToSend)
span.AddEvent("results sent", trace.WithAttributes(
attribute.Int64("alerts_sent", int64(len(alerts.PostableAlerts))),
))
sendDuration.Observe(a.clock.Now().Sub(start).Seconds())
},
)
processDuration.Observe(a.clock.Now().Sub(start).Seconds())
start = a.clock.Now()
alerts := state.FromStateTransitionToPostableAlerts(e.scheduledAt, processedStates, a.stateManager, a.appURL)
span.AddEvent("results processed", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(processedStates))),
attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))),
))
if len(alerts.PostableAlerts) > 0 {
a.sender.Send(ctx, key, alerts)
}
sendDuration.Observe(a.clock.Now().Sub(start).Seconds())
return nil
}
func (a *alertRule) notify(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) {
// send sends alerts for the given state transitions.
func (a *alertRule) send(ctx context.Context, key ngmodels.AlertRuleKey, states state.StateTransitions) definitions.PostableAlerts {
alerts := definitions.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(states))}
for _, alertState := range states {
alerts.PostableAlerts = append(alerts.PostableAlerts, *state.StateToPostableAlert(alertState, a.appURL))
}
if len(alerts.PostableAlerts) > 0 {
a.sender.Send(ctx, key, alerts)
}
return alerts
}
// sendExpire sends alerts to expire all previously firing alerts in the provided state transitions.
func (a *alertRule) expireAndSend(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) {
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
if len(expiredAlerts.PostableAlerts) > 0 {
a.sender.Send(ctx, key, expiredAlerts)
@ -446,7 +459,7 @@ func (a *alertRule) resetState(ctx context.Context, key ngmodels.AlertRuleKey, i
reason = ngmodels.StateReasonPaused
}
states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason)
a.notify(ctx, key, states)
a.expireAndSend(ctx, key, states)
}
// evalApplied is only used on tests.

View File

@ -456,7 +456,7 @@ func TestRuleRoutine(t *testing.T) {
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
rule := gen.GenerateRef()
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil, nil)
expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.NotEmpty(t, expectedStates)
@ -478,7 +478,7 @@ func TestRuleRoutine(t *testing.T) {
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
rule := gen.GenerateRef()
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil, nil)
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
factory := ruleFactoryFromScheduler(sch)

View File

@ -6,7 +6,6 @@ import (
"net/url"
"path"
"strconv"
"time"
"github.com/benbjohnson/clock"
"github.com/go-openapi/strfmt"
@ -18,7 +17,6 @@ import (
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
)
const (
@ -140,26 +138,6 @@ func errorAlert(labels, annotations data.Labels, alertState *State, urlStr strin
}
}
func FromStateTransitionToPostableAlerts(evaluatedAt time.Time, firingStates []StateTransition, stateManager *Manager, appURL *url.URL) apimodels.PostableAlerts {
alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))}
sentAlerts := make([]*State, 0, len(firingStates))
for _, alertState := range firingStates {
if !alertState.NeedsSending(stateManager.ResendDelay, stateManager.ResolvedRetention) {
continue
}
alert := StateToPostableAlert(alertState, appURL)
alerts.PostableAlerts = append(alerts.PostableAlerts, *alert)
if alertState.StateReason == ngModels.StateReasonMissingSeries { // do not put stale state back to state manager
continue
}
alertState.LastSentAt = &evaluatedAt
sentAlerts = append(sentAlerts, alertState.State)
}
stateManager.Put(sentAlerts)
return alerts
}
// FromAlertsStateToStoppedAlert selects only transitions from firing states (states eval.Alerting, eval.NoData, eval.Error)
// and converts them to models.PostableAlert with EndsAt set to time.Now
func FromAlertsStateToStoppedAlert(firingStates []StateTransition, appURL *url.URL, clock clock.Clock) apimodels.PostableAlerts {

View File

@ -31,9 +31,12 @@ type AlertInstanceManager interface {
type StatePersister interface {
Async(ctx context.Context, cache *cache)
Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition)
Sync(ctx context.Context, span trace.Span, states StateTransitions)
}
// Sender is an optional callback intended for sending the states to an alertmanager.
type Sender func(context.Context, StateTransitions)
type Manager struct {
log log.Logger
metrics *metrics.State
@ -298,9 +301,17 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.Alert
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.
// if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []StateTransition {
// This will update the states in cache/store and return the state transitions that need to be sent to the alertmanager.
func (st *Manager) ProcessEvalResults(
ctx context.Context,
evaluatedAt time.Time,
alertRule *ngModels.AlertRule,
results eval.Results,
extraLabels data.Labels,
send Sender,
) StateTransitions {
utcTick := evaluatedAt.UTC().Format(time.RFC3339Nano)
tracingCtx, span := st.tracer.Start(ctx, "alert rule state calculation", trace.WithAttributes(
ctx, span := st.tracer.Start(ctx, "alert rule state calculation", trace.WithAttributes(
attribute.String("rule_uid", alertRule.UID),
attribute.Int64("org_id", alertRule.OrgID),
attribute.Int64("rule_version", alertRule.Version),
@ -308,23 +319,52 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time
attribute.Int("results", len(results))))
defer span.End()
logger := st.log.FromContext(tracingCtx)
logger := st.log.FromContext(ctx)
logger.Debug("State manager processing evaluation results", "resultCount", len(results))
states := st.setNextStateForRule(tracingCtx, alertRule, results, extraLabels, logger)
span.AddEvent("results processed", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(states))),
))
states := st.setNextStateForRule(ctx, alertRule, results, extraLabels, logger)
staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule)
st.persister.Sync(tracingCtx, span, states, staleStates)
span.AddEvent("results processed", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(states))),
attribute.Int64("stale_states", int64(len(staleStates))),
))
allChanges := append(states, staleStates...)
if st.historian != nil {
st.historian.Record(tracingCtx, history_model.NewRuleMeta(alertRule, logger), allChanges)
allChanges := StateTransitions(append(states, staleStates...))
// It's important that this is done *before* we sync the states to the persister. Otherwise, we will not persist
// the LastSentAt field to the store.
var statesToSend StateTransitions
if send != nil {
statesToSend = st.updateLastSentAt(allChanges, evaluatedAt)
}
st.persister.Sync(ctx, span, allChanges)
if st.historian != nil {
st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges)
}
// Optional callback intended for sending the states to an alertmanager.
// Some uses ,such as backtesting or the testing api, do not send.
if send != nil {
send(ctx, statesToSend)
}
return allChanges
}
// updateLastSentAt returns the subset StateTransitions that need sending and updates their LastSentAt field.
// Note: This is not idempotent, running this twice can (and usually will) return different results.
func (st *Manager) updateLastSentAt(states StateTransitions, evaluatedAt time.Time) StateTransitions {
var result StateTransitions
for _, t := range states {
if t.NeedsSending(st.ResendDelay, st.ResolvedRetention) {
t.LastSentAt = &evaluatedAt
result = append(result, t)
}
}
return result
}
func (st *Manager) setNextStateForRule(ctx context.Context, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels, logger log.Logger) []StateTransition {
if st.applyNoDataAndErrorToAllStates && results.IsNoData() && (alertRule.NoDataState == ngModels.Alerting || alertRule.NoDataState == ngModels.OK || alertRule.NoDataState == ngModels.KeepLast) { // If it is no data, check the mapping and switch all results to the new state
// TODO aggregate UID of datasources that returned NoData into one and provide as auxiliary info, probably annotation

View File

@ -39,7 +39,7 @@ func BenchmarkProcessEvalResults(b *testing.B) {
var ans []state.StateTransition
for i := 0; i < b.N; i++ {
ans = sut.ProcessEvalResults(context.Background(), now, &rule, results, labels)
ans = sut.ProcessEvalResults(context.Background(), now, &rule, results, labels, nil)
}
b.StopTimer()

View File

@ -120,7 +120,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
evaluationInterval := 10 * time.Second
tN := func(n int) time.Time {
return time.Time{}.Add(time.Duration(n) * evaluationInterval)
return time.Unix(0, 0).UTC().Add(time.Duration(n) * evaluationInterval)
}
t1 := tN(1)
t2 := tN(2)
@ -270,7 +270,15 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
for _, ts := range tss {
results := resultsAtTime[ts]
clk.Set(ts)
actual := st.ProcessEvalResults(context.Background(), ts, alertRule, results, systemLabels)
var statesToSend StateTransitions
actual := st.ProcessEvalResults(context.Background(), ts, alertRule, results, systemLabels, func(_ context.Context, states StateTransitions) {
statesToSend = states
})
// Expect all statesToSend to have a LastSentAt set to the evaluation time.
for _, state := range statesToSend {
assert.Equalf(t, ts, *state.LastSentAt, "LastSentAt should be set to the evaluation time for all ready-to-send transitions.")
}
expectedTransitions, ok := expectedTransitionsAtTime[ts]
if !ok { // skip if nothing to assert
@ -367,6 +375,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
},
},
{
@ -443,6 +452,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
{
@ -510,6 +520,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -564,6 +575,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t2,
LastEvaluationTime: t2,
ResolvedAt: &t2,
LastSentAt: &t2,
},
},
},
@ -623,6 +635,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -682,6 +695,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
},
},
},
@ -733,6 +747,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -819,6 +834,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
},
},
},
@ -835,6 +851,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
},
},
},
@ -895,6 +912,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -911,6 +929,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -961,6 +980,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -1025,6 +1045,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -1052,6 +1073,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -1063,6 +1085,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -1092,6 +1115,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -1105,6 +1129,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -1134,6 +1159,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -1176,6 +1202,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -1207,6 +1234,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
{
@ -1219,6 +1247,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t1,
},
},
},
@ -1234,6 +1263,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
{
@ -1247,6 +1277,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t1,
},
},
},
@ -1276,6 +1307,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t2,
LastEvaluationTime: t2,
ResolvedAt: &t2,
LastSentAt: &t2,
},
},
},
@ -1305,6 +1337,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t2,
LastEvaluationTime: t3,
ResolvedAt: &t2,
LastSentAt: &t2,
},
},
},
@ -1333,6 +1366,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t1,
},
},
},
@ -1361,6 +1395,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t1,
},
},
},
@ -1394,6 +1429,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -1431,6 +1467,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -1486,6 +1523,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1598,6 +1636,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -1613,6 +1652,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
{
@ -1626,6 +1666,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -1710,6 +1751,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -1738,6 +1780,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -1770,6 +1813,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1785,6 +1829,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1800,6 +1845,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1815,6 +1861,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1833,6 +1880,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1865,6 +1913,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -1908,6 +1957,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t3,
LastEvaluationTime: t3,
LastSentAt: &t1, // We don't bother updating LastSentAt for StateReasonMissingSeries since it's deleted from state.
},
},
},
@ -1937,6 +1987,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
},
@ -2021,6 +2072,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2037,6 +2089,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2087,6 +2140,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2150,6 +2204,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2165,6 +2220,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -2176,6 +2232,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -2193,6 +2250,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -2206,6 +2264,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -2223,6 +2282,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -2253,6 +2313,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
{
@ -2284,6 +2345,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t1,
},
},
},
@ -2299,6 +2361,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t1,
},
},
},
@ -2316,6 +2379,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t2,
LastEvaluationTime: t2,
ResolvedAt: &t2,
LastSentAt: &t2,
},
},
},
@ -2332,6 +2396,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t2,
LastEvaluationTime: t3,
ResolvedAt: &t2,
LastSentAt: &t2,
},
},
},
@ -2348,6 +2413,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t1,
},
},
},
@ -2363,6 +2429,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t1,
},
},
},
@ -2395,6 +2462,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2408,6 +2476,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -2423,6 +2492,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -2438,6 +2508,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -2453,6 +2524,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -2471,6 +2543,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -2503,6 +2576,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -2588,6 +2662,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -2608,6 +2683,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
},
},
},
@ -2666,6 +2742,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": genericError.Error(),
}),
@ -2686,6 +2763,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t1.Add(ResendDelay * 4),
LastEvaluationTime: t1,
LastSentAt: &t1,
},
},
},
@ -2749,6 +2827,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -2820,6 +2899,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2852,6 +2932,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2882,6 +2963,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -2902,6 +2984,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -2953,6 +3036,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -3030,6 +3114,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t1,
EndsAt: t3,
LastEvaluationTime: t3,
LastSentAt: &t1, // We don't bother updating LastSentAt for StateReasonMissingSeries since it's deleted from state.
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -3063,6 +3148,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t3,
LastEvaluationTime: t3,
ResolvedAt: &t3,
LastSentAt: &t3,
},
},
},
@ -3149,6 +3235,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -3169,6 +3256,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -3245,6 +3333,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -3265,6 +3354,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -3297,6 +3387,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
},
},
},
@ -3331,6 +3422,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2.Add(ResendDelay * 4),
LastEvaluationTime: t2,
LastSentAt: &t2,
Annotations: mergeLabels(baseRule.Annotations, data.Labels{
"Error": datasourceError.Error(),
}),
@ -3347,6 +3439,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t2,
},
},
},
@ -3378,6 +3471,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -3438,6 +3532,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t3,
EndsAt: t3.Add(ResendDelay * 4),
LastEvaluationTime: t3,
LastSentAt: &t3,
},
},
},
@ -3466,6 +3561,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
StartsAt: t2,
EndsAt: t2,
LastEvaluationTime: t2,
LastSentAt: &t1, // TODO: Fix me. This should be t2 since we should be resolving the previous DatasourceError alert.
},
},
},
@ -3483,6 +3579,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
EndsAt: t2,
LastEvaluationTime: t2,
ResolvedAt: &t2,
LastSentAt: &t2,
},
},
},

View File

@ -7,7 +7,6 @@ import (
"fmt"
"math"
"math/rand"
"net/url"
"sort"
"strings"
"testing"
@ -269,7 +268,7 @@ func TestDashboardAnnotations(t *testing.T) {
},
}}, data.Labels{
"alertname": rule.Title,
})
}, nil)
expected := []string{rule.Title + " {alertname=" + rule.Title + ", instance_label=testValue2, test1=testValue1, test2=testValue2} - B=42.000000, C=1.000000"}
sort.Strings(expected)
@ -1415,9 +1414,8 @@ func TestProcessEvalResults(t *testing.T) {
res[i].EvaluatedAt = evalTime
}
clk.Set(evalTime)
processedStates := st.ProcessEvalResults(context.Background(), evalTime, tc.alertRule, res, systemLabels)
_ = st.ProcessEvalResults(context.Background(), evalTime, tc.alertRule, res, systemLabels, state.NoopSender)
results += len(res)
_ = state.FromStateTransitionToPostableAlerts(evalTime, processedStates, st, &url.URL{}) // Set LastSentAt.
}
states := st.GetStatesForRuleUID(tc.alertRule.OrgID, tc.alertRule.UID)
@ -1506,8 +1504,7 @@ func TestProcessEvalResults(t *testing.T) {
rule := models.RuleGen.GenerateRef()
var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
states := st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, make(data.Labels))
states := st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, make(data.Labels), nil)
require.NotEmpty(t, states)
savedStates := make(map[data.Fingerprint]models.AlertInstance)
@ -1668,7 +1665,7 @@ func TestStaleResultsHandler(t *testing.T) {
"alertname": rule.Title,
"__alert_rule_namespace_uid__": rule.NamespaceUID,
"__alert_rule_uid__": rule.UID,
})
}, nil)
for _, s := range tc.expectedStates {
setCacheID(s)
cachedState := st.Get(s.OrgID, s.AlertRuleUID, s.CacheID)
@ -1737,7 +1734,7 @@ func TestStaleResults(t *testing.T) {
rule := gen.With(gen.WithFor(0)).GenerateRef()
initResults := eval.Results{
eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))(),
eval.ResultGen(eval.WithState(eval.Alerting), eval.WithEvaluatedAt(clk.Now()))(),
eval.ResultGen(eval.WithState(eval.Alerting), eval.WithEvaluatedAt(clk.Now()))(),
eval.ResultGen(eval.WithState(eval.Normal), eval.WithEvaluatedAt(clk.Now()))(),
}
@ -1753,9 +1750,15 @@ func TestStaleResults(t *testing.T) {
}
// Init
processed := st.ProcessEvalResults(ctx, clk.Now(), rule, initResults, nil)
var statesToSend state.StateTransitions
processed := st.ProcessEvalResults(ctx, clk.Now(), rule, initResults, nil, func(_ context.Context, states state.StateTransitions) {
statesToSend = states
})
checkExpectedStateTransitions(t, processed, initStates)
// Check that it returns just those state transitions that needs to be sent.
checkExpectedStateTransitions(t, statesToSend, map[data.Fingerprint]struct{}{state1: {}, state2: {}}) // Does not contain the Normal state3.
currentStates := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
statesMap := checkExpectedStates(t, currentStates, initStates)
require.Equal(t, eval.Alerting, statesMap[state2].State) // make sure the state is alerting because we need it to be resolved later
@ -1770,7 +1773,7 @@ func TestStaleResults(t *testing.T) {
var expectedStaleKeys []models.AlertInstanceKey
t.Run("should mark missing states as stale", func(t *testing.T) {
processed = st.ProcessEvalResults(ctx, clk.Now(), rule, results, nil)
processed = st.ProcessEvalResults(ctx, clk.Now(), rule, results, nil, nil)
checkExpectedStateTransitions(t, processed, initStates)
for _, s := range processed {
if s.CacheID == state1 {

View File

@ -64,6 +64,6 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error
return nil
}
func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {
func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) {
a.log.Debug("Sync: No-Op")
}

View File

@ -8,8 +8,8 @@ import (
type NoopPersister struct{}
func (n *NoopPersister) Async(_ context.Context, _ *cache) {}
func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {}
func (n *NoopPersister) Async(_ context.Context, _ *cache) {}
func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) {}
func NewNoopPersister() StatePersister {
return &NoopPersister{}

View File

@ -33,15 +33,18 @@ func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister {
func (a *SyncStatePersister) Async(_ context.Context, _ *cache) {
a.log.Debug("Async: No-Op")
}
func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) {
a.deleteAlertStates(ctx, staleStates)
// Sync persists the state transitions to the database. It deletes stale states and saves the current states.
func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, allStates StateTransitions) {
staleStates := allStates.StaleStates()
if len(staleStates) > 0 {
a.deleteAlertStates(ctx, staleStates)
span.AddEvent("deleted stale states", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(staleStates))),
))
}
a.saveAlertStates(ctx, states...)
a.saveAlertStates(ctx, allStates...)
span.AddEvent("updated database")
}
@ -75,6 +78,12 @@ func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...Stat
saveState := func(ctx context.Context, idx int) error {
s := states[idx]
// Do not save stale state to database.
if s.IsStale() {
return nil
}
// Do not save normal state to database and remove transition to Normal state but keep mapped states
if a.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() {
return nil

View File

@ -65,7 +65,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
InstanceStore: st,
MaxStateSaveConcurrency: 1,
})
syncStatePersister.Sync(context.Background(), span, transitions, nil)
syncStatePersister.Sync(context.Background(), span, transitions)
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
for _, op := range st.RecordedOps() {
saved := op.(ngmodels.AlertInstance)
@ -86,7 +86,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
InstanceStore: st,
MaxStateSaveConcurrency: 1,
})
syncStatePersister.Sync(context.Background(), span, transitions, nil)
syncStatePersister.Sync(context.Background(), span, transitions)
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
for _, op := range st.RecordedOps() {

View File

@ -187,6 +187,19 @@ func (c StateTransition) Changed() bool {
return c.PreviousState != c.State.State || c.PreviousStateReason != c.State.StateReason
}
type StateTransitions []StateTransition
// StaleStates returns the subset of StateTransitions that are stale.
func (c StateTransitions) StaleStates() StateTransitions {
var result StateTransitions
for _, t := range c {
if t.IsStale() {
result = append(result, t)
}
}
return result
}
type Evaluation struct {
EvaluationTime time.Time
EvaluationState eval.State
@ -484,6 +497,11 @@ func (a *State) GetLastEvaluationValuesForCondition() map[string]float64 {
return r
}
// IsStale returns true if the state is stale, meaning that the state is ready to be evicted from the cache.
func (a *State) IsStale() bool {
return a.StateReason == models.StateReasonMissingSeries
}
// shouldTakeImage returns true if the state just has transitioned to alerting from another state,
// transitioned to alerting in a previous evaluation but does not have a screenshot, or has just
// been resolved.

View File

@ -100,3 +100,6 @@ type NoopImageService struct{}
func (s *NoopImageService) NewImage(_ context.Context, _ *models.AlertRule) (*models.Image, error) {
return &models.Image{}, nil
}
// NoopSender is a no-op sender. Used when you want state manager to update LastSentAt without sending any alerts.
var NoopSender = func(_ context.Context, _ StateTransitions) {}