diff --git a/pkg/services/ngalert/state/cache.go b/pkg/services/ngalert/state/cache.go index f962d6c9ebd..360ba0aa490 100644 --- a/pkg/services/ngalert/state/cache.go +++ b/pkg/services/ngalert/state/cache.go @@ -263,3 +263,9 @@ func mergeLabels(a, b data.Labels) data.Labels { } return newLbs } + +func (c *cache) deleteEntry(orgID int64, alertRuleUID, cacheID string) { + c.mtxStates.Lock() + defer c.mtxStates.Unlock() + delete(c.states[orgID][alertRuleUID], cacheID) +} diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index d3dfbe1886a..7e1d86725b3 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -137,11 +137,13 @@ func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) { func (st *Manager) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results) []*State { st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results)) var states []*State + processedResults := make(map[string]*State, len(results)) for _, result := range results { s := st.setNextState(alertRule, result) states = append(states, s) + processedResults[s.CacheId] = s } - st.log.Debug("returning changed states to scheduler", "count", len(states)) + st.staleResultsHandler(alertRule, processedResults) return states } @@ -265,3 +267,27 @@ func (st *Manager) createAlertAnnotation(new eval.State, alertRule *ngModels.Ale return } } + +func (st *Manager) staleResultsHandler(alertRule *ngModels.AlertRule, states map[string]*State) { + allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID) + for _, s := range allStates { + _, ok := states[s.CacheId] + if !ok && isItStale(s.LastEvaluationTime, alertRule.IntervalSeconds) { + st.log.Debug("removing stale state entry", "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId) + st.cache.deleteEntry(s.OrgID, s.AlertRuleUID, s.CacheId) + ilbs := ngModels.InstanceLabels(s.Labels) + _, labelsHash, err := ilbs.StringAndHash() + if err != nil { + st.log.Error("unable to get labelsHash", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID) + } + + if err = st.instanceStore.DeleteAlertInstance(s.OrgID, s.AlertRuleUID, labelsHash); err != nil { + st.log.Error("unable to delete stale instance from database", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId) + } + } + } +} + +func isItStale(lastEval time.Time, intervalSeconds int64) bool { + return lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).Before(time.Now()) +} diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index 1e6ca5af369..39cfe682607 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -4,6 +4,9 @@ import ( "testing" "time" + "github.com/grafana/grafana/pkg/registry" + "github.com/grafana/grafana/pkg/services/ngalert/tests" + "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/services/ngalert/metrics" @@ -864,3 +867,107 @@ func TestProcessEvalResults(t *testing.T) { }) } } + +func TestStaleResultsHandler(t *testing.T) { + evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") + if err != nil { + t.Fatalf("error parsing date format: %s", err.Error()) + } + + dbstore := tests.SetupTestEnv(t, 1) + + rule := tests.CreateTestAlertRule(t, dbstore, 600) + + saveCmd1 := &models.SaveAlertInstanceCommand{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + Labels: models.InstanceLabels{"test1": "testValue1"}, + State: models.InstanceStateNormal, + LastEvalTime: evaluationTime, + CurrentStateSince: evaluationTime.Add(-1 * time.Minute), + CurrentStateEnd: evaluationTime.Add(1 * time.Minute), + } + + _ = dbstore.SaveAlertInstance(saveCmd1) + + saveCmd2 := &models.SaveAlertInstanceCommand{ + RuleOrgID: rule.OrgID, + RuleUID: rule.UID, + Labels: models.InstanceLabels{"test2": "testValue2"}, + State: models.InstanceStateFiring, + LastEvalTime: evaluationTime, + CurrentStateSince: evaluationTime.Add(-1 * time.Minute), + CurrentStateEnd: evaluationTime.Add(1 * time.Minute), + } + _ = dbstore.SaveAlertInstance(saveCmd2) + + t.Cleanup(registry.ClearOverrides) + + testCases := []struct { + desc string + evalResults []eval.Results + expectedStates map[string]*state.State + startingStateCount int + finalStateCount int + }{ + { + desc: "stale cache entries are removed", + evalResults: []eval.Results{ + { + eval.Result{ + Instance: data.Labels{"test1": "testValue1"}, + State: eval.Normal, + EvaluatedAt: evaluationTime.Add(3 * time.Minute), + }, + }, + }, + expectedStates: map[string]*state.State{ + `[["__alert_rule_namespace_uid__","namespace"],["__alert_rule_uid__","` + rule.UID + `"],["alertname","` + rule.Title + `"],["test1","testValue1"]]`: { + AlertRuleUID: rule.UID, + OrgID: 1, + CacheId: `[["__alert_rule_namespace_uid__","namespace"],["__alert_rule_uid__","` + rule.UID + `"],["alertname","` + rule.Title + `"],["test1","testValue1"]]`, + Labels: data.Labels{ + "__alert_rule_namespace_uid__": "namespace", + "__alert_rule_uid__": rule.UID, + "alertname": rule.Title, + "test1": "testValue1", + }, + State: eval.Normal, + Results: []state.Evaluation{ + { + EvaluationTime: evaluationTime.Add(3 * time.Minute), + EvaluationState: eval.Normal, + Values: make(map[string]state.EvaluationValue), + }, + }, + LastEvaluationTime: evaluationTime.Add(3 * time.Minute), + EvaluationDuration: 0, + Annotations: map[string]string{"testAnnoKey": "testAnnoValue"}, + }, + }, + startingStateCount: 2, + finalStateCount: 1, + }, + } + + for _, tc := range testCases { + st := state.NewManager(log.New("test_stale_results_handler"), nilMetrics, dbstore, dbstore) + st.Warm() + existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) + + // We have loaded the expected number of entries from the db + assert.Equal(t, tc.startingStateCount, len(existingStatesForRule)) + for _, res := range tc.evalResults { + st.ProcessEvalResults(rule, res) + for _, s := range tc.expectedStates { + cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId) + require.NoError(t, err) + assert.Equal(t, s, cachedState) + } + } + existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID) + + // The expected number of state entries remains after results are processed + assert.Equal(t, tc.finalStateCount, len(existingStatesForRule)) + } +} diff --git a/pkg/services/ngalert/store/instance_database.go b/pkg/services/ngalert/store/instance_database.go index 5c4bc0d7b1e..76ffe2ef485 100644 --- a/pkg/services/ngalert/store/instance_database.go +++ b/pkg/services/ngalert/store/instance_database.go @@ -14,6 +14,7 @@ type InstanceStore interface { ListAlertInstances(cmd *models.ListAlertInstancesQuery) error SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error FetchOrgIds() ([]int64, error) + DeleteAlertInstance(orgID int64, ruleUID, labelsHash string) error } // GetAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and @@ -142,3 +143,13 @@ func (st DBstore) FetchOrgIds() ([]int64, error) { return orgIds, err } + +func (st DBstore) DeleteAlertInstance(orgID int64, ruleUID, labelsHash string) error { + return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error { + _, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash = ?", orgID, ruleUID, labelsHash) + if err != nil { + return err + } + return nil + }) +}