Alerting: Refactor ruleRoutine to take an entire ruleInfo instance (#83858)

* Make stop a real method

* ruleRoutine takes a ruleInfo reference directly rather than pieces of it

* Fix whitespace
This commit is contained in:
Alexander Weaver 2024-03-04 15:15:01 -06:00 committed by GitHub
parent 3121fce305
commit f2a9d0a89d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 41 deletions

View File

@ -10,12 +10,17 @@ type alertRuleInfo struct {
evalCh chan *evaluation evalCh chan *evaluation
updateCh chan ruleVersionAndPauseStatus updateCh chan ruleVersionAndPauseStatus
ctx context.Context ctx context.Context
stop func(reason error) stopFn util.CancelCauseFunc
} }
func newAlertRuleInfo(parent context.Context) *alertRuleInfo { func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
ctx, stop := util.WithCancelCause(parent) ctx, stop := util.WithCancelCause(parent)
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersionAndPauseStatus), ctx: ctx, stop: stop} return &alertRuleInfo{
evalCh: make(chan *evaluation),
updateCh: make(chan ruleVersionAndPauseStatus),
ctx: ctx,
stopFn: stop,
}
} }
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped. // eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped.
@ -58,3 +63,8 @@ func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool {
return false return false
} }
} }
// stop sends an instruction to the rule evaluation routine to shut down. an optional shutdown reason can be given.
func (a *alertRuleInfo) stop(reason error) {
a.stopFn(reason)
}

View File

@ -256,7 +256,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
if newRoutine && !invalidInterval { if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error { dispatcherGroup.Go(func() error {
return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh, ruleInfo.updateCh) return sch.ruleRoutine(key, ruleInfo)
}) })
} }
@ -345,8 +345,8 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
} }
//nolint:gocyclo //nolint:gocyclo
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error { func (sch *schedule) ruleRoutine(key ngmodels.AlertRuleKey, ruleInfo *alertRuleInfo) error {
grafanaCtx = ngmodels.WithRuleKey(grafanaCtx, key) grafanaCtx := ngmodels.WithRuleKey(ruleInfo.ctx, key)
logger := sch.log.FromContext(grafanaCtx) logger := sch.log.FromContext(grafanaCtx)
logger.Debug("Alert rule routine started") logger.Debug("Alert rule routine started")
@ -474,7 +474,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
for { for {
select { select {
// used by external services (API) to notify that rule is updated. // used by external services (API) to notify that rule is updated.
case ctx := <-updateCh: case ctx := <-ruleInfo.updateCh:
if currentFingerprint == ctx.Fingerprint { if currentFingerprint == ctx.Fingerprint {
logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint) logger.Info("Rule's fingerprint has not changed. Skip resetting the state", "currentFingerprint", currentFingerprint)
continue continue
@ -485,7 +485,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
resetState(grafanaCtx, ctx.IsPaused) resetState(grafanaCtx, ctx.IsPaused)
currentFingerprint = ctx.Fingerprint currentFingerprint = ctx.Fingerprint
// evalCh - used by the scheduler to signal that evaluation is needed. // evalCh - used by the scheduler to signal that evaluation is needed.
case ctx, ok := <-evalCh: case ctx, ok := <-ruleInfo.evalCh:
if !ok { if !ok {
logger.Debug("Evaluation channel has been closed. Exiting") logger.Debug("Evaluation channel has been closed. Exiting")
return nil return nil

View File

@ -384,22 +384,22 @@ func TestSchedule_ruleRoutine(t *testing.T) {
for _, evalState := range normalStates { for _, evalState := range normalStates {
// TODO rewrite when we are able to mock/fake state manager // TODO rewrite when we are able to mock/fake state manager
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) { t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil) sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil)
rule := models.AlertRuleGen(withQueryForState(t, evalState))() rule := models.AlertRuleGen(withQueryForState(t, evalState))()
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID) folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) _ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
}() }()
expectedTime := time.UnixMicro(rand.Int63()) expectedTime := time.UnixMicro(rand.Int63())
evalChan <- &evaluation{ ruleInfo.evalCh <- &evaluation{
scheduledAt: expectedTime, scheduledAt: expectedTime,
rule: rule, rule: rule,
folderTitle: folderTitle, folderTitle: folderTitle,
@ -540,8 +540,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.NotEmpty(t, expectedStates) require.NotEmpty(t, expectedStates)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ruleInfo := newAlertRuleInfo(ctx)
go func() { go func() {
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersionAndPauseStatus)) err := sch.ruleRoutine(models.AlertRuleKey{}, ruleInfo)
stoppedChan <- err stoppedChan <- err
}() }()
@ -550,7 +551,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))) require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)))
}) })
t.Run("and clean up the state if delete is cancellation reason ", func(t *testing.T) { t.Run("and clean up the state if delete is cancellation reason for inner context", func(t *testing.T) {
stoppedChan := make(chan error) stoppedChan := make(chan error)
sch, _, _, _ := createSchedule(make(chan time.Time), nil) sch, _, _, _ := createSchedule(make(chan time.Time), nil)
@ -558,13 +559,13 @@ func TestSchedule_ruleRoutine(t *testing.T) {
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil) _ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
ctx, cancel := util.WithCancelCause(context.Background()) ruleInfo := newAlertRuleInfo(context.Background())
go func() { go func() {
err := sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), make(chan ruleVersionAndPauseStatus)) err := sch.ruleRoutine(rule.GetKey(), ruleInfo)
stoppedChan <- err stoppedChan <- err
}() }()
cancel(errRuleDeleted) ruleInfo.stop(errRuleDeleted)
err := waitForErrChannel(t, stoppedChan) err := waitForErrChannel(t, stoppedChan)
require.NoError(t, err) require.NoError(t, err)
@ -577,9 +578,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
folderTitle := "folderName" folderTitle := "folderName"
ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint() ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint()
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
updateChan := make(chan ruleVersionAndPauseStatus)
sender := NewSyncAlertsSenderMock() sender := NewSyncAlertsSenderMock()
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return() sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
@ -587,15 +586,16 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle}) sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle})
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) _ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, updateChan)
}() }()
// init evaluation loop so it got the rule version // init evaluation loop so it got the rule version
evalChan <- &evaluation{ ruleInfo.evalCh <- &evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
folderTitle: folderTitle, folderTitle: folderTitle,
@ -631,8 +631,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired") require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
t.Run("should do nothing if version in channel is the same", func(t *testing.T) { t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
updateChan <- ruleVersionAndPauseStatus{ruleFp, false} ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false}
updateChan <- ruleVersionAndPauseStatus{ruleFp, false} // second time just to make sure that previous messages were handled ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false} // second time just to make sure that previous messages were handled
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, actualStates, len(states)) require.Len(t, actualStates, len(states))
@ -641,7 +641,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
}) })
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) { t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
updateChan <- ruleVersionAndPauseStatus{ruleFp + 1, false} ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp + 1, false}
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
return len(sender.Calls()) > 0 return len(sender.Calls()) > 0
@ -659,7 +659,6 @@ func TestSchedule_ruleRoutine(t *testing.T) {
rule := models.AlertRuleGen(withQueryForState(t, eval.Error))() rule := models.AlertRuleGen(withQueryForState(t, eval.Error))()
rule.ExecErrState = models.ErrorErrState rule.ExecErrState = models.ErrorErrState
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
sender := NewSyncAlertsSenderMock() sender := NewSyncAlertsSenderMock()
@ -668,14 +667,15 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender)
sch.maxAttempts = 3 sch.maxAttempts = 3
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) _ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
}() }()
evalChan <- &evaluation{ ruleInfo.evalCh <- &evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
} }
@ -765,7 +765,6 @@ func TestSchedule_ruleRoutine(t *testing.T) {
// eval.Alerting makes state manager to create notifications for alertmanagers // eval.Alerting makes state manager to create notifications for alertmanagers
rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))() rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))()
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
sender := NewSyncAlertsSenderMock() sender := NewSyncAlertsSenderMock()
@ -773,14 +772,15 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) _ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
}() }()
evalChan <- &evaluation{ ruleInfo.evalCh <- &evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
} }
@ -798,7 +798,6 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) { t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))() rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
evalChan := make(chan *evaluation)
evalAppliedChan := make(chan time.Time) evalAppliedChan := make(chan time.Time)
sender := NewSyncAlertsSenderMock() sender := NewSyncAlertsSenderMock()
@ -806,14 +805,15 @@ func TestSchedule_ruleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) _ = sch.ruleRoutine(rule.GetKey(), ruleInfo)
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
}() }()
evalChan <- &evaluation{ ruleInfo.evalCh <- &evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
} }