Alerting: Clear the state cache when the alert routine stops (#99681)

This commit is contained in:
Alexander Akhmetov 2025-01-28 20:15:19 +01:00 committed by GitHub
parent 97f4a164d1
commit a0bf9202f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 83 additions and 21 deletions

View File

@ -352,16 +352,23 @@ func (a *alertRule) Run() error {
case <-grafanaCtx.Done():
reason := grafanaCtx.Err()
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
// We do not want a context to be unbounded which could potentially cause a go routine running
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
// cases.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
if errors.Is(reason, errRuleDeleted) {
// We do not want a context to be unbounded which could potentially cause a go routine running
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
// cases.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key.AlertRuleKey), a.key, ngmodels.StateReasonRuleDeleted)
a.expireAndSend(grafanaCtx, states)
// Clean up the state and send resolved notifications for firing alerts only if the reason for stopping
// the evaluation loop is that the rule was deleted.
stateTransitions := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key.AlertRuleKey), a.key, ngmodels.StateReasonRuleDeleted)
a.expireAndSend(grafanaCtx, stateTransitions)
} else {
// Otherwise, just clean up the cache.
a.stateManager.ForgetStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key.AlertRuleKey), a.key)
}
a.logger.Debug("Stopping alert rule routine", "reason", reason)
return nil
}

View File

@ -3,6 +3,7 @@ package schedule
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand"
@ -278,7 +279,12 @@ func TestAlertRuleIdentifier(t *testing.T) {
}
func blankRuleForTests(ctx context.Context, key models.AlertRuleKeyWithGroup) *alertRule {
return newAlertRule(ctx, key, nil, false, 0, nil, nil, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
managerCfg := state.ManagerCfg{
Historian: &state.FakeHistorian{},
Log: log.NewNopLogger(),
}
st := state.NewManager(managerCfg, state.NewNoopPersister())
return newAlertRule(ctx, key, nil, false, 0, nil, st, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
}
func TestRuleRoutine(t *testing.T) {
@ -477,12 +483,26 @@ func TestRuleRoutine(t *testing.T) {
}
t.Run("should exit", func(t *testing.T) {
t.Run("and not clear the state if parent context is cancelled", func(t *testing.T) {
stoppedChan := make(chan error)
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
rule := gen.With(withQueryForState(t, eval.Alerting)).GenerateRef()
genEvalResults := func(now time.Time) eval.Results {
return eval.GenerateResults(
rand.Intn(5)+1,
eval.ResultGen(
eval.WithEvaluatedAt(now),
// State should be alerting to test resolved notifications in some cases.
// When the alert rule is firing and is deleted, we should send
// resolved notifications.
eval.WithState(eval.Alerting),
),
)
}
rule := gen.GenerateRef()
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil, nil)
t.Run("and clean up the state if parent context is cancelled", func(t *testing.T) {
stoppedChan := make(chan error)
sender := NewSyncAlertsSenderMock()
sch, _, _, _ := createSchedule(make(chan time.Time), sender)
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, genEvalResults(sch.clock.Now()), nil, nil)
expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.NotEmpty(t, expectedStates)
@ -497,14 +517,40 @@ func TestRuleRoutine(t *testing.T) {
cancel()
err := waitForErrChannel(t, stoppedChan)
require.NoError(t, err)
require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)))
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
sender.AlertsSenderMock.AssertNotCalled(t, "Send")
})
t.Run("and clean up the state if delete is cancellation reason for inner context", func(t *testing.T) {
stoppedChan := make(chan error)
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
rule := gen.GenerateRef()
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil, nil)
t.Run("and clean up the state but not send anything if the reason is not rule deleted", func(t *testing.T) {
stoppedChan := make(chan error)
sender := NewSyncAlertsSenderMock()
sch, _, _, _ := createSchedule(make(chan time.Time), sender)
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, genEvalResults(sch.clock.Now()), nil, nil)
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
factory := ruleFactoryFromScheduler(sch)
ruleInfo := factory.new(context.Background(), rule)
go func() {
err := ruleInfo.Run()
stoppedChan <- err
}()
ruleInfo.Stop(errors.New("some reason"))
err := waitForErrChannel(t, stoppedChan)
require.NoError(t, err)
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
sender.AlertsSenderMock.AssertNotCalled(t, "Send")
})
t.Run("and send resolved notifications if errRuleDeleted is the reason for stopping", func(t *testing.T) {
stoppedChan := make(chan error)
sender := NewSyncAlertsSenderMock()
sender.EXPECT().Send(mock.Anything, mock.Anything, mock.Anything).Times(1)
sch, _, _, _ := createSchedule(make(chan time.Time), sender)
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, genEvalResults(sch.clock.Now()), nil, nil)
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
factory := ruleFactoryFromScheduler(sch)
@ -519,6 +565,7 @@ func TestRuleRoutine(t *testing.T) {
require.NoError(t, err)
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
sender.AlertsSenderMock.AssertExpectations(t)
})
})

View File

@ -242,7 +242,7 @@ func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.Al
logger := st.log.FromContext(ctx)
logger.Debug("Resetting state of the rule")
states := st.cache.removeByRuleUID(ruleKey.OrgID, ruleKey.UID)
states := st.ForgetStateByRuleUID(ctx, ruleKey)
if len(states) == 0 {
return nil
@ -280,11 +280,19 @@ func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.Al
logger.Error("Failed to delete states that belong to a rule from database", "error", err)
}
}
logger.Info("Rules state was reset", "states", len(states))
return transitions
}
func (st *Manager) ForgetStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKeyWithGroup) []*State {
logger := st.log.FromContext(ctx)
logger.Debug("Removing rule state from cache")
return st.cache.removeByRuleUID(ruleKey.OrgID, ruleKey.UID)
}
// ResetStateByRuleUID removes the rule instances from cache and instanceStore and saves state history. If the state
// history has to be saved, rule must not be nil.
func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.AlertRule, reason string) []StateTransition {