diff --git a/pkg/services/ngalert/api/api_ruler.go b/pkg/services/ngalert/api/api_ruler.go index 5b195599241..635899411b2 100644 --- a/pkg/services/ngalert/api/api_ruler.go +++ b/pkg/services/ngalert/api/api_ruler.go @@ -426,7 +426,7 @@ func (srv RulerSrv) updateAlertRulesInGroup(c *models.ReqContext, groupKey ngmod srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{ OrgID: c.SignedInUser.OrgID, UID: rule.Existing.UID, - }, rule.Existing.Version+1) + }, rule.Existing.Version+1, rule.New.IsPaused) } if len(finalChanges.Delete) > 0 { diff --git a/pkg/services/ngalert/api/persist.go b/pkg/services/ngalert/api/persist.go index 10c7e3747c7..e7b84c85843 100644 --- a/pkg/services/ngalert/api/persist.go +++ b/pkg/services/ngalert/api/persist.go @@ -22,7 +22,7 @@ type RuleStore interface { DeleteAlertRulesByUID(ctx context.Context, orgID int64, ruleUID ...string) error // IncreaseVersionForAllRulesInNamespace Increases version for all rules that have specified namespace. Returns all rules that belong to the namespace - IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersion, error) + IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersionAndPauseStatus, error) Count(ctx context.Context, orgID int64) (int64, error) } diff --git a/pkg/services/ngalert/models/alert_rule.go b/pkg/services/ngalert/models/alert_rule.go index 884b59516fa..0a0cbb4607c 100644 --- a/pkg/services/ngalert/models/alert_rule.go +++ b/pkg/services/ngalert/models/alert_rule.go @@ -109,6 +109,8 @@ const ( const ( StateReasonMissingSeries = "MissingSeries" StateReasonError = "Error" + StateReasonPaused = "Paused" + StateReasonUpdated = "Updated" ) var ( @@ -156,6 +158,7 @@ type AlertRule struct { For time.Duration Annotations map[string]string Labels map[string]string + IsPaused bool } // GetDashboardUID returns the DashboardUID or "". @@ -261,6 +264,11 @@ type AlertRuleKeyWithVersion struct { AlertRuleKey `xorm:"extends"` } +type AlertRuleKeyWithVersionAndPauseStatus struct { + IsPaused bool + AlertRuleKeyWithVersion `xorm:"extends"` +} + // AlertRuleGroupKey is the identifier of a group of alerts type AlertRuleGroupKey struct { OrgID int64 @@ -335,6 +343,7 @@ type AlertRuleVersion struct { For time.Duration Annotations map[string]string Labels map[string]string + IsPaused bool } // GetAlertRuleByUIDQuery is the query for retrieving/deleting an alert rule by UID and organisation ID. diff --git a/pkg/services/ngalert/models/instance.go b/pkg/services/ngalert/models/instance.go index 7a62fcb1fce..e4180109772 100644 --- a/pkg/services/ngalert/models/instance.go +++ b/pkg/services/ngalert/models/instance.go @@ -34,7 +34,7 @@ const ( InstanceStatePending InstanceStateType = "Pending" // InstanceStateNoData is for an alert with no data. InstanceStateNoData InstanceStateType = "NoData" - // InstanceStateError is for a erroring alert. + // InstanceStateError is for an erroring alert. InstanceStateError InstanceStateType = "Error" ) diff --git a/pkg/services/ngalert/models/instance_test.go b/pkg/services/ngalert/models/instance_test.go new file mode 100644 index 00000000000..82263b1bcd8 --- /dev/null +++ b/pkg/services/ngalert/models/instance_test.go @@ -0,0 +1,105 @@ +package models + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestInstanceStateType_IsValid(t *testing.T) { + testCases := []struct { + instanceType InstanceStateType + expectedValidity bool + }{ + { + instanceType: InstanceStateFiring, + expectedValidity: true, + }, + { + instanceType: InstanceStateNormal, + expectedValidity: true, + }, + { + instanceType: InstanceStatePending, + expectedValidity: true, + }, + { + instanceType: InstanceStateNoData, + expectedValidity: true, + }, + { + instanceType: InstanceStateError, + expectedValidity: true, + }, + { + instanceType: InstanceStateType("notAValidInstanceStateType"), + expectedValidity: false, + }, + } + + for _, tc := range testCases { + t.Run(buildTestInstanceStateTypeIsValidName(tc.instanceType, tc.expectedValidity), func(t *testing.T) { + require.Equal(t, tc.expectedValidity, tc.instanceType.IsValid()) + }) + } +} + +func buildTestInstanceStateTypeIsValidName(instanceType InstanceStateType, expectedValidity bool) string { + if expectedValidity { + return fmt.Sprintf("%q should be valid", instanceType) + } + return fmt.Sprintf("%q should not be valid", instanceType) +} + +func TestValidateAlertInstance(t *testing.T) { + testCases := []struct { + name string + orgId int64 + uid string + currentState InstanceStateType + err error + }{ + { + name: "fails if orgID is empty", + orgId: 0, + uid: "validUid", + currentState: InstanceStateNormal, + err: errors.New("alert instance is invalid due to missing alert rule organisation"), + }, + { + name: "fails if uid is empty", + orgId: 1, + uid: "", + currentState: InstanceStateNormal, + err: errors.New("alert instance is invalid due to missing alert rule uid"), + }, + { + name: "fails if current state is not valid", + orgId: 1, + uid: "validUid", + currentState: InstanceStateType("notAValidType"), + err: errors.New("alert instance is invalid because the state 'notAValidType' is invalid"), + }, + { + name: "ok if validated fields are correct", + orgId: 1, + uid: "validUid", + currentState: InstanceStateNormal, + err: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + instance := AlertInstanceGen(func(instance *AlertInstance) { + instance.AlertInstanceKey.RuleOrgID = tc.orgId + instance.AlertInstanceKey.RuleUID = tc.uid + instance.CurrentState = tc.currentState + }) + + require.Equal(t, tc.err, ValidateAlertInstance(*instance)) + }) + } +} diff --git a/pkg/services/ngalert/models/testing.go b/pkg/services/ngalert/models/testing.go index 6c1ed0c7897..544c4cb8330 100644 --- a/pkg/services/ngalert/models/testing.go +++ b/pkg/services/ngalert/models/testing.go @@ -337,3 +337,46 @@ func CreateClassicConditionExpression(refID string, inputRefID string, reducer s }`, refID, inputRefID, operation, threshold, reducer, expr.OldDatasourceUID, expr.DatasourceType)), } } + +type AlertInstanceMutator func(*AlertInstance) + +// AlertInstanceGen provides a factory function that generates a random AlertInstance. +// The mutators arguments allows changing fields of the resulting structure. +func AlertInstanceGen(mutators ...AlertInstanceMutator) *AlertInstance { + var labels map[string]string = nil + if rand.Int63()%2 == 0 { + labels = GenerateAlertLabels(rand.Intn(5), "lbl-") + } + + randState := func() InstanceStateType { + s := [...]InstanceStateType{ + InstanceStateFiring, + InstanceStateNormal, + InstanceStatePending, + InstanceStateNoData, + InstanceStateError, + } + return s[rand.Intn(len(s))] + } + + currentStateSince := time.Now().Add(-time.Duration(rand.Intn(100) + 1)) + + instance := &AlertInstance{ + AlertInstanceKey: AlertInstanceKey{ + RuleOrgID: rand.Int63n(1500), + RuleUID: util.GenerateShortUID(), + LabelsHash: util.GenerateShortUID(), + }, + Labels: labels, + CurrentState: randState(), + CurrentReason: "TEST-REASON-" + util.GenerateShortUID(), + CurrentStateSince: currentStateSince, + CurrentStateEnd: currentStateSince.Add(time.Duration(rand.Intn(100) + 200)), + LastEvalTime: time.Now().Add(-time.Duration(rand.Intn(100) + 50)), + } + + for _, mutator := range mutators { + mutator(instance) + } + return instance +} diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index e113610b66d..cc92ef97891 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -310,7 +310,7 @@ func subscribeToFolderChanges(ctx context.Context, logger log.Logger, bus bus.Bu if len(updated) > 0 { logger.Info("Rules that belong to the folder have been updated successfully. Clearing their status", "folderUID", evt.UID, "updatedRules", len(updated)) for _, key := range updated { - scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version) + scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version, key.IsPaused) } } else { logger.Debug("No alert rules found in the folder. nothing to update", "folderUID", evt.UID, "folder", evt.Title) diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index ceb93ca58ca..a8b7d6c6f39 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -35,7 +35,7 @@ func Test_subscribeToFolderChanges(t *testing.T) { db.PutRule(context.Background(), rules...) scheduler := &schedule.FakeScheduleService{} - scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything).Return() + scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything, mock.Anything).Return() subscribeToFolderChanges(context.Background(), log.New("test"), bus, db, scheduler) @@ -69,6 +69,6 @@ func Test_subscribeToFolderChanges(t *testing.T) { }, time.Second, 10*time.Millisecond, "scheduler was expected to be called %d times but called %d", len(rules), calledTimes) for _, rule := range rules { - scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version) + scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version, false) } } diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index e07ee458d0d..d162e3d3957 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -77,17 +77,21 @@ func (r *alertRuleInfoRegistry) keyMap() map[models.AlertRuleKey]struct{} { } type ruleVersion int64 +type ruleVersionAndPauseStatus struct { + Version ruleVersion + IsPaused bool +} type alertRuleInfo struct { evalCh chan *evaluation - updateCh chan ruleVersion + updateCh chan ruleVersionAndPauseStatus ctx context.Context stop func(reason error) } func newAlertRuleInfo(parent context.Context) *alertRuleInfo { ctx, stop := util.WithCancelCause(parent) - return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersion), ctx: ctx, stop: stop} + return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersionAndPauseStatus), ctx: ctx, stop: stop} } // eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped. @@ -114,13 +118,13 @@ func (a *alertRuleInfo) eval(eval *evaluation) (bool, *evaluation) { } // update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen. -func (a *alertRuleInfo) update(lastVersion ruleVersion) bool { +func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool { // check if the channel is not empty. msg := lastVersion select { case v := <-a.updateCh: // if it has a version pick the greatest one. - if v > msg { + if v.Version > msg.Version { msg = v } case <-a.ctx.Done(): diff --git a/pkg/services/ngalert/schedule/registry_test.go b/pkg/services/ngalert/schedule/registry_test.go index 2e48d248955..896962af74c 100644 --- a/pkg/services/ngalert/schedule/registry_test.go +++ b/pkg/services/ngalert/schedule/registry_test.go @@ -27,7 +27,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) { r := newAlertRuleInfo(context.Background()) resultCh := make(chan bool) go func() { - resultCh <- r.update(ruleVersion(rand.Int63())) + resultCh <- r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false}) }() select { case <-r.updateCh: @@ -45,19 +45,19 @@ func TestSchedule_alertRuleInfo(t *testing.T) { wg.Add(1) go func() { wg.Done() - r.update(version1) + r.update(ruleVersionAndPauseStatus{version1, false}) wg.Done() }() wg.Wait() wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started go func() { wg.Done() - r.update(version2) + r.update(ruleVersionAndPauseStatus{version2, false}) }() wg.Wait() // at this point tick 1 has already been dropped select { case version := <-r.updateCh: - require.Equal(t, version2, version) + require.Equal(t, ruleVersionAndPauseStatus{version2, false}, version) case <-time.After(5 * time.Second): t.Fatal("No message was received on eval channel") } @@ -71,19 +71,19 @@ func TestSchedule_alertRuleInfo(t *testing.T) { wg.Add(1) go func() { wg.Done() - r.update(version2) + r.update(ruleVersionAndPauseStatus{version2, false}) wg.Done() }() wg.Wait() wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started go func() { wg.Done() - r.update(version1) + r.update(ruleVersionAndPauseStatus{version1, false}) }() wg.Wait() // at this point tick 1 has already been dropped select { case version := <-r.updateCh: - require.Equal(t, version2, version) + require.Equal(t, ruleVersionAndPauseStatus{version2, false}, version) case <-time.After(5 * time.Second): t.Fatal("No message was received on eval channel") } @@ -185,7 +185,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) { r := newAlertRuleInfo(context.Background()) r.stop(errRuleDeleted) require.ErrorIs(t, r.ctx.Err(), errRuleDeleted) - require.False(t, r.update(ruleVersion(rand.Int63()))) + require.False(t, r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false})) }) t.Run("eval should do nothing", func(t *testing.T) { r := newAlertRuleInfo(context.Background()) @@ -237,7 +237,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) { } switch rand.Intn(max) + 1 { case 1: - r.update(ruleVersion(rand.Int63())) + r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false}) case 2: r.eval(&evaluation{ scheduledAt: time.Now(), diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index ba7a93bebc5..f8353a23413 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -37,7 +37,7 @@ type ScheduleService interface { // an error. The scheduler is terminated when this function returns. Run(context.Context) error // UpdateAlertRule notifies scheduler that a rule has been changed - UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64) + UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool) // DeleteAlertRule notifies scheduler that rules have been deleted DeleteAlertRule(keys ...ngmodels.AlertRuleKey) } @@ -150,12 +150,12 @@ func (sch *schedule) Run(ctx context.Context) error { } // UpdateAlertRule looks for the active rule evaluation and commands it to update the rule -func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64) { +func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool) { ruleInfo, err := sch.registry.get(key) if err != nil { return } - ruleInfo.update(ruleVersion(lastVersion)) + ruleInfo.update(ruleVersionAndPauseStatus{ruleVersion(lastVersion), isPaused}) } // DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache. @@ -314,7 +314,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. return readyToRun, registeredDefinitions } -func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersion) error { +func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error { grafanaCtx = ngmodels.WithRuleKey(grafanaCtx, key) logger := sch.log.FromContext(grafanaCtx) logger.Debug("Alert rule routine started") @@ -324,14 +324,23 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID) evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID) - clearState := func() { - states := sch.stateManager.ResetStateByRuleUID(grafanaCtx, key) + notify := func(states []*state.State) { expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock) if len(expiredAlerts.PostableAlerts) > 0 { sch.alertsSender.Send(key, expiredAlerts) } } + resetState := func(ctx context.Context, isPaused bool) { + rule := sch.schedulableAlertRules.get(key) + reason := ngmodels.StateReasonUpdated + if isPaused { + reason = ngmodels.StateReasonPaused + } + states := sch.stateManager.ResetStateByRuleUID(ctx, rule, reason) + notify(states) + } + evaluate := func(ctx context.Context, attempt int64, e *evaluation, span tracing.Span) { logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt) start := sch.clock.Now() @@ -431,18 +440,19 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR for { select { // used by external services (API) to notify that rule is updated. - case lastVersion := <-updateCh: + case ctx := <-updateCh: // sometimes it can happen when, for example, the rule evaluation took so long, // and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first. // therefore, at the time when message from updateCh is processed the current rule will have // at least the same version (or greater) and the state created for the new version of the rule. - if currentRuleVersion >= int64(lastVersion) { - logger.Info("Skip updating rule because its current version is actual", "version", currentRuleVersion, "newVersion", lastVersion) + if currentRuleVersion >= int64(ctx.Version) { + logger.Info("Skip updating rule because its current version is actual", "version", currentRuleVersion, "newVersion", ctx.Version) continue } - logger.Info("Clearing the state of the rule because version has changed", "version", currentRuleVersion, "newVersion", lastVersion) + + logger.Info("Clearing the state of the rule because it was updated", "version", currentRuleVersion, "newVersion", ctx.Version, "isPaused", ctx.IsPaused) // clear the state. So the next evaluation will start from the scratch. - clearState() + resetState(grafanaCtx, ctx.IsPaused) // evalCh - used by the scheduler to signal that evaluation is needed. case ctx, ok := <-evalCh: if !ok { @@ -462,14 +472,18 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR err := retryIfError(func(attempt int64) error { newVersion := ctx.rule.Version + isPaused := ctx.rule.IsPaused // fetch latest alert rule version if currentRuleVersion != newVersion { if currentRuleVersion > 0 { // do not clean up state if the eval loop has just started. logger.Debug("Got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "newVersion", newVersion) - clearState() + resetState(grafanaCtx, isPaused) } currentRuleVersion = newVersion } + if isPaused { + return nil + } tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution") defer span.End() @@ -489,7 +503,13 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR case <-grafanaCtx.Done(): // clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted if errors.Is(grafanaCtx.Err(), errRuleDeleted) { - clearState() + // We do not want a context to be unbounded which could potentially cause a go routine running + // indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the + // cases. + ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) + defer cancelFunc() + states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key) + notify(states) } logger.Debug("Stopping alert rule routine") return nil diff --git a/pkg/services/ngalert/schedule/schedule_mock.go b/pkg/services/ngalert/schedule/schedule_mock.go index 720a7366813..9a950779f32 100644 --- a/pkg/services/ngalert/schedule/schedule_mock.go +++ b/pkg/services/ngalert/schedule/schedule_mock.go @@ -36,8 +36,8 @@ func (_m *FakeScheduleService) Run(_a0 context.Context) error { } // UpdateAlertRule provides a mock function with given fields: key, lastVersion -func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64) { - _m.Called(key, lastVersion) +func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64, isPaused bool) { + _m.Called(key, lastVersion, isPaused) } // evalApplied provides a mock function with given fields: _a0, _a1 diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index 2cbc59b5319..20e8f58bf79 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -235,7 +235,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus)) }() expectedTime := time.UnixMicro(rand.Int63()) @@ -347,7 +347,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) go func() { - err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersion)) + err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersionAndPauseStatus)) stoppedChan <- err }() @@ -366,7 +366,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { ctx, cancel := util.WithCancelCause(context.Background()) go func() { - err := sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), make(chan ruleVersion)) + err := sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), make(chan ruleVersionAndPauseStatus)) stoppedChan <- err }() @@ -383,13 +383,14 @@ func TestSchedule_ruleRoutine(t *testing.T) { evalChan := make(chan *evaluation) evalAppliedChan := make(chan time.Time) - updateChan := make(chan ruleVersion) + updateChan := make(chan ruleVersionAndPauseStatus) sender := AlertsSenderMock{} sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return() sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender) ruleStore.PutRule(context.Background(), rule) + sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[string]string{rule.NamespaceUID: "folderName"}) go func() { ctx, cancel := context.WithCancel(context.Background()) @@ -433,9 +434,9 @@ func TestSchedule_ruleRoutine(t *testing.T) { require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired") t.Run("should do nothing if version in channel is the same", func(t *testing.T) { - updateChan <- ruleVersion(rule.Version - 1) - updateChan <- ruleVersion(rule.Version) - updateChan <- ruleVersion(rule.Version) // second time just to make sure that previous messages were handled + updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version - 1), false} + updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version), false} + updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version), false} // second time just to make sure that previous messages were handled actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) require.Len(t, actualStates, len(states)) @@ -444,7 +445,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { }) t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) { - updateChan <- ruleVersion(rule.Version + rand.Int63n(1000) + 1) + updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version + rand.Int63n(1000) + 1), false} require.Eventually(t, func() bool { return len(sender.Calls) > 0 @@ -474,7 +475,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus)) }() evalChan <- &evaluation{ @@ -544,7 +545,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus)) }() evalChan <- &evaluation{ @@ -577,7 +578,7 @@ func TestSchedule_ruleRoutine(t *testing.T) { go func() { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion)) + _ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus)) }() evalChan <- &evaluation{ @@ -601,12 +602,12 @@ func TestSchedule_UpdateAlertRule(t *testing.T) { info, _ := sch.registry.getOrCreateInfo(context.Background(), key) version := rand.Int63() go func() { - sch.UpdateAlertRule(key, version) + sch.UpdateAlertRule(key, version, false) }() select { case v := <-info.updateCh: - require.Equal(t, ruleVersion(version), v) + require.Equal(t, ruleVersionAndPauseStatus{ruleVersion(version), false}, v) case <-time.After(5 * time.Second): t.Fatal("No message was received on update channel") } @@ -616,14 +617,14 @@ func TestSchedule_UpdateAlertRule(t *testing.T) { key := models.GenerateRuleKey(rand.Int63()) info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info.stop(nil) - sch.UpdateAlertRule(key, rand.Int63()) + sch.UpdateAlertRule(key, rand.Int63(), false) }) }) t.Run("when rule does not exist", func(t *testing.T) { t.Run("should exit", func(t *testing.T) { sch := setupScheduler(t, nil, nil, nil, nil, nil) key := models.GenerateRuleKey(rand.Int63()) - sch.UpdateAlertRule(key, rand.Int63()) + sch.UpdateAlertRule(key, rand.Int63(), false) }) }) } diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 779066d64da..9d222fda1f2 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -7,7 +7,6 @@ import ( "github.com/benbjohnson/clock" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" @@ -165,18 +164,61 @@ func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) *State { return st.cache.get(orgID, alertRuleUID, stateId) } -// ResetStateByRuleUID deletes all entries in the state manager that match the given rule UID. -func (st *Manager) ResetStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State { +// DeleteStateByRuleUID removes the rule instances from cache and instanceStore. A closed channel is returned to be able +// to gracefully handle the clear state step in scheduler in case we do not need to use the historian to save state +// history. +func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State { logger := st.log.New(ruleKey.LogContext()...) logger.Debug("Resetting state of the rule") + states := st.cache.removeByRuleUID(ruleKey.OrgID, ruleKey.UID) - if len(states) > 0 && st.instanceStore != nil { + if len(states) == 0 { + return states + } + if st.instanceStore != nil { err := st.instanceStore.DeleteAlertInstancesByRule(ctx, ruleKey) if err != nil { logger.Error("Failed to delete states that belong to a rule from database", "error", err) } } logger.Info("Rules state was reset", "states", len(states)) + + return states +} + +// ResetStateByRuleUID removes the rule instances from cache and instanceStore and saves state history. If the state +// history has to be saved, rule must not be nil. +func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.AlertRule, reason string) []*State { + ruleKey := rule.GetKey() + states := st.DeleteStateByRuleUID(ctx, ruleKey) + + if rule == nil || st.historian == nil { + return states + } + transitions := make([]StateTransition, 0, len(states)) + for _, s := range states { + oldState := s.State + oldReason := s.StateReason + state := *s + now := time.Now() + state.SetNormal(reason, s.StartsAt, now) + state.LastEvaluationTime = now + state.Values = map[string]float64{} + transitions = append(transitions, StateTransition{ + State: &state, + PreviousState: oldState, + PreviousStateReason: oldReason, + }) + } + + ruleMeta := history_model.NewRuleMeta(rule, st.log) + errCh := st.historian.RecordStatesAsync(ctx, ruleMeta, transitions) + go func() { + err := <-errCh + if err != nil { + st.log.FromContext(ctx).Error("Error updating historian state reset transitions", append(ruleKey.LogContext(), "reason", reason, "error", err)...) + } + }() return states } @@ -240,7 +282,7 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu logger.Debug("Ignoring set next state as result is pending") } - // Set reason iff: result is different than state, reason is not Alerting or Normal + // Set reason iff: result and state are different, reason is not Alerting or Normal currentState.StateReason = "" if currentState.State != result.State && diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index ca96f4f6de9..047c861ca80 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -2543,3 +2543,245 @@ func TestStaleResults(t *testing.T) { } }) } + +func TestDeleteStateByRuleUID(t *testing.T) { + interval := time.Minute + ctx := context.Background() + _, dbstore := tests.SetupTestEnv(t, 1) + + const mainOrgID int64 = 1 + rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) + + labels1 := models.InstanceLabels{"test1": "testValue1"} + _, hash1, _ := labels1.StringAndHash() + labels2 := models.InstanceLabels{"test2": "testValue2"} + _, hash2, _ := labels2.StringAndHash() + instances := []models.AlertInstance{ + { + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash1, + }, + CurrentState: models.InstanceStateNormal, + Labels: labels1, + }, + { + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash2, + }, + CurrentState: models.InstanceStateFiring, + Labels: labels2, + }, + } + + _ = dbstore.SaveAlertInstances(ctx, instances...) + + testCases := []struct { + desc string + instanceStore state.InstanceStore + + expectedStates map[string]*state.State + + startingStateCacheCount int + finalStateCacheCount int + startingInstanceDBCount int + finalInstanceDBCount int + }{ + { + desc: "all states/instances are removed from cache and DB", + instanceStore: dbstore, + expectedStates: map[string]*state.State{ + `[["test1","testValue1"]]`: { + AlertRuleUID: rule.UID, + OrgID: 1, + CacheID: `[["test1","testValue1"]]`, + Labels: data.Labels{"test1": "testValue1"}, + State: eval.Normal, + EvaluationDuration: 0, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, + `[["test2","testValue2"]]`: { + AlertRuleUID: rule.UID, + OrgID: 1, + CacheID: `[["test2","testValue2"]]`, + Labels: data.Labels{"test2": "testValue2"}, + State: eval.Alerting, + EvaluationDuration: 0, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, + }, + startingStateCacheCount: 2, + finalStateCacheCount: 0, + startingInstanceDBCount: 2, + finalInstanceDBCount: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + ctx := context.Background() + cfg := state.ManagerCfg{ + Metrics: testMetrics.GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clock.New(), + Historian: &state.FakeHistorian{}, + } + st := state.NewManager(cfg) + st.Warm(ctx, dbstore) + q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} + _ = dbstore.ListAlertInstances(ctx, q) + existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) + + // We have loaded the expected number of entries from the db + assert.Equal(t, tc.startingStateCacheCount, len(existingStatesForRule)) + assert.Equal(t, tc.startingInstanceDBCount, len(q.Result)) + + states := st.DeleteStateByRuleUID(ctx, rule.GetKey()) + + // Check that the deleted states are the same as the ones that were in cache + assert.Equal(t, tc.startingStateCacheCount, len(states)) + for _, s := range states { + assert.Equal(t, tc.expectedStates[s.CacheID], s) + } + + q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} + _ = dbstore.ListAlertInstances(ctx, q) + existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) + + // The expected number of state entries remains after states are deleted + assert.Equal(t, tc.finalStateCacheCount, len(existingStatesForRule)) + assert.Equal(t, tc.finalInstanceDBCount, len(q.Result)) + }) + } +} + +func TestResetStateByRuleUID(t *testing.T) { + interval := time.Minute + ctx := context.Background() + _, dbstore := tests.SetupTestEnv(t, 1) + + const mainOrgID int64 = 1 + rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID) + + labels1 := models.InstanceLabels{"test1": "testValue1"} + _, hash1, _ := labels1.StringAndHash() + labels2 := models.InstanceLabels{"test2": "testValue2"} + _, hash2, _ := labels2.StringAndHash() + instances := []models.AlertInstance{ + { + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash1, + }, + CurrentState: models.InstanceStateNormal, + Labels: labels1, + }, + { + AlertInstanceKey: models.AlertInstanceKey{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + LabelsHash: hash2, + }, + CurrentState: models.InstanceStateFiring, + Labels: labels2, + }, + } + + _ = dbstore.SaveAlertInstances(ctx, instances...) + + testCases := []struct { + desc string + instanceStore state.InstanceStore + + expectedStates map[string]*state.State + + startingStateCacheCount int + finalStateCacheCount int + startingInstanceDBCount int + finalInstanceDBCount int + newHistorianEntriesCount int + }{ + { + desc: "all states/instances are removed from cache and DB and saved in historian", + instanceStore: dbstore, + expectedStates: map[string]*state.State{ + `[["test1","testValue1"]]`: { + AlertRuleUID: rule.UID, + OrgID: 1, + CacheID: `[["test1","testValue1"]]`, + Labels: data.Labels{"test1": "testValue1"}, + State: eval.Normal, + EvaluationDuration: 0, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, + `[["test2","testValue2"]]`: { + AlertRuleUID: rule.UID, + OrgID: 1, + CacheID: `[["test2","testValue2"]]`, + Labels: data.Labels{"test2": "testValue2"}, + State: eval.Alerting, + EvaluationDuration: 0, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, + }, + startingStateCacheCount: 2, + finalStateCacheCount: 0, + startingInstanceDBCount: 2, + finalInstanceDBCount: 0, + newHistorianEntriesCount: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + ctx := context.Background() + fakeHistorian := &state.FakeHistorian{StateTransitions: make([]state.StateTransition, 0)} + cfg := state.ManagerCfg{ + Metrics: testMetrics.GetStateMetrics(), + ExternalURL: nil, + InstanceStore: dbstore, + Images: &state.NoopImageService{}, + Clock: clock.New(), + Historian: fakeHistorian, + } + st := state.NewManager(cfg) + st.Warm(ctx, dbstore) + q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} + _ = dbstore.ListAlertInstances(ctx, q) + existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) + + // We have loaded the expected number of entries from the db + assert.Equal(t, tc.startingStateCacheCount, len(existingStatesForRule)) + assert.Equal(t, tc.startingInstanceDBCount, len(q.Result)) + + states := st.ResetStateByRuleUID(ctx, rule, models.StateReasonPaused) + + // Check that the deleted states are the same as the ones that were in cache + assert.Equal(t, tc.startingStateCacheCount, len(states)) + for _, s := range states { + assert.Equal(t, tc.expectedStates[s.CacheID], s) + } + + // Check if both entries have been added to the historian + assert.Equal(t, tc.newHistorianEntriesCount, len(fakeHistorian.StateTransitions)) + for _, str := range fakeHistorian.StateTransitions { + assert.Equal(t, tc.expectedStates[str.State.CacheID].State, str.PreviousState) + assert.Equal(t, tc.expectedStates[str.State.CacheID].StateReason, str.PreviousStateReason) + } + + q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} + _ = dbstore.ListAlertInstances(ctx, q) + existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) + + // The expected number of state entries remains after states are deleted + assert.Equal(t, tc.finalStateCacheCount, len(existingStatesForRule)) + assert.Equal(t, tc.finalInstanceDBCount, len(q.Result)) + }) + } +} diff --git a/pkg/services/ngalert/state/testing.go b/pkg/services/ngalert/state/testing.go index dcb1f31e362..c1d60a3fbb3 100644 --- a/pkg/services/ngalert/state/testing.go +++ b/pkg/services/ngalert/state/testing.go @@ -61,9 +61,12 @@ func (f *FakeRuleReader) ListAlertRules(_ context.Context, q *models.ListAlertRu return nil } -type FakeHistorian struct{} +type FakeHistorian struct { + StateTransitions []StateTransition +} func (f *FakeHistorian) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []StateTransition) <-chan error { + f.StateTransitions = append(f.StateTransitions, states...) errCh := make(chan error) close(errCh) return errCh diff --git a/pkg/services/ngalert/store/alert_rule.go b/pkg/services/ngalert/store/alert_rule.go index 0992b375de0..74e7b2f8387 100644 --- a/pkg/services/ngalert/store/alert_rule.go +++ b/pkg/services/ngalert/store/alert_rule.go @@ -68,8 +68,8 @@ func (st DBstore) DeleteAlertRulesByUID(ctx context.Context, orgID int64, ruleUI } // IncreaseVersionForAllRulesInNamespace Increases version for all rules that have specified namespace. Returns all rules that belong to the namespace -func (st DBstore) IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersion, error) { - var keys []ngmodels.AlertRuleKeyWithVersion +func (st DBstore) IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersionAndPauseStatus, error) { + var keys []ngmodels.AlertRuleKeyWithVersionAndPauseStatus err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error { now := TimeNow() _, err := sess.Exec("UPDATE alert_rule SET version = version + 1, updated = ? WHERE namespace_uid = ? AND org_id = ?", now, namespaceUID, orgID) diff --git a/pkg/services/ngalert/tests/fakes/rules.go b/pkg/services/ngalert/tests/fakes/rules.go index f970ed86da8..55128376ecb 100644 --- a/pkg/services/ngalert/tests/fakes/rules.go +++ b/pkg/services/ngalert/tests/fakes/rules.go @@ -316,7 +316,7 @@ func (f *RuleStore) UpdateRuleGroup(ctx context.Context, orgID int64, namespaceU return nil } -func (f *RuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, orgID int64, namespaceUID string) ([]models.AlertRuleKeyWithVersion, error) { +func (f *RuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, orgID int64, namespaceUID string) ([]models.AlertRuleKeyWithVersionAndPauseStatus, error) { f.mtx.Lock() defer f.mtx.Unlock() @@ -325,15 +325,18 @@ func (f *RuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, org Params: []interface{}{orgID, namespaceUID}, }) - var result []models.AlertRuleKeyWithVersion + var result []models.AlertRuleKeyWithVersionAndPauseStatus for _, rule := range f.Rules[orgID] { if rule.NamespaceUID == namespaceUID && rule.OrgID == orgID { rule.Version++ rule.Updated = time.Now() - result = append(result, models.AlertRuleKeyWithVersion{ - Version: rule.Version, - AlertRuleKey: rule.GetKey(), + result = append(result, models.AlertRuleKeyWithVersionAndPauseStatus{ + IsPaused: rule.IsPaused, + AlertRuleKeyWithVersion: models.AlertRuleKeyWithVersion{ + Version: rule.Version, + AlertRuleKey: rule.GetKey(), + }, }) } } diff --git a/pkg/services/sqlstore/migrations/ualert/tables.go b/pkg/services/sqlstore/migrations/ualert/tables.go index 24f12167a29..de7953e7747 100644 --- a/pkg/services/sqlstore/migrations/ualert/tables.go +++ b/pkg/services/sqlstore/migrations/ualert/tables.go @@ -260,6 +260,16 @@ func AddAlertRuleMigrations(mg *migrator.Migrator, defaultIntervalSeconds int64) Default: "1", }, )) + + mg.AddMigration("add is_paused column to alert_rule table", migrator.NewAddColumnMigration( + alertRule, + &migrator.Column{ + Name: "is_paused", + Type: migrator.DB_Bool, + Nullable: false, + Default: "false", + }, + )) } func AddAlertRuleVersionMigrations(mg *migrator.Migrator) { @@ -313,6 +323,16 @@ func AddAlertRuleVersionMigrations(mg *migrator.Migrator) { Default: "1", }, )) + + mg.AddMigration("add is_paused column to alert_rule_versions table", migrator.NewAddColumnMigration( + alertRuleVersion, + &migrator.Column{ + Name: "is_paused", + Type: migrator.DB_Bool, + Nullable: false, + Default: "false", + }, + )) } func AddAlertmanagerConfigMigrations(mg *migrator.Migrator) {