Alerting: automatically remove stale alerting states (#36767)

* initial attempt at automatic removal of stale states

* test case, need espected states

* finish unit test

* PR feedback

* still multiply by time.second

* pr feedback
This commit is contained in:
David Parrott 2021-07-26 09:12:04 -07:00 committed by GitHub
parent 81b98745c0
commit b5f464412d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 151 additions and 1 deletions

View File

@ -263,3 +263,9 @@ func mergeLabels(a, b data.Labels) data.Labels {
} }
return newLbs return newLbs
} }
func (c *cache) deleteEntry(orgID int64, alertRuleUID, cacheID string) {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
delete(c.states[orgID][alertRuleUID], cacheID)
}

View File

@ -137,11 +137,13 @@ func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
func (st *Manager) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results) []*State { func (st *Manager) ProcessEvalResults(alertRule *ngModels.AlertRule, results eval.Results) []*State {
st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results)) st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
var states []*State var states []*State
processedResults := make(map[string]*State, len(results))
for _, result := range results { for _, result := range results {
s := st.setNextState(alertRule, result) s := st.setNextState(alertRule, result)
states = append(states, s) states = append(states, s)
processedResults[s.CacheId] = s
} }
st.log.Debug("returning changed states to scheduler", "count", len(states)) st.staleResultsHandler(alertRule, processedResults)
return states return states
} }
@ -265,3 +267,27 @@ func (st *Manager) createAlertAnnotation(new eval.State, alertRule *ngModels.Ale
return return
} }
} }
func (st *Manager) staleResultsHandler(alertRule *ngModels.AlertRule, states map[string]*State) {
allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID)
for _, s := range allStates {
_, ok := states[s.CacheId]
if !ok && isItStale(s.LastEvaluationTime, alertRule.IntervalSeconds) {
st.log.Debug("removing stale state entry", "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
st.cache.deleteEntry(s.OrgID, s.AlertRuleUID, s.CacheId)
ilbs := ngModels.InstanceLabels(s.Labels)
_, labelsHash, err := ilbs.StringAndHash()
if err != nil {
st.log.Error("unable to get labelsHash", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID)
}
if err = st.instanceStore.DeleteAlertInstance(s.OrgID, s.AlertRuleUID, labelsHash); err != nil {
st.log.Error("unable to delete stale instance from database", "error", err.Error(), "orgID", s.OrgID, "alertRuleUID", s.AlertRuleUID, "cacheID", s.CacheId)
}
}
}
}
func isItStale(lastEval time.Time, intervalSeconds int64) bool {
return lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).Before(time.Now())
}

View File

@ -4,6 +4,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/ngalert/tests"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -864,3 +867,107 @@ func TestProcessEvalResults(t *testing.T) {
}) })
} }
} }
func TestStaleResultsHandler(t *testing.T) {
evaluationTime, err := time.Parse("2006-01-02", "2021-03-25")
if err != nil {
t.Fatalf("error parsing date format: %s", err.Error())
}
dbstore := tests.SetupTestEnv(t, 1)
rule := tests.CreateTestAlertRule(t, dbstore, 600)
saveCmd1 := &models.SaveAlertInstanceCommand{
RuleOrgID: rule.OrgID,
RuleUID: rule.UID,
Labels: models.InstanceLabels{"test1": "testValue1"},
State: models.InstanceStateNormal,
LastEvalTime: evaluationTime,
CurrentStateSince: evaluationTime.Add(-1 * time.Minute),
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
}
_ = dbstore.SaveAlertInstance(saveCmd1)
saveCmd2 := &models.SaveAlertInstanceCommand{
RuleOrgID: rule.OrgID,
RuleUID: rule.UID,
Labels: models.InstanceLabels{"test2": "testValue2"},
State: models.InstanceStateFiring,
LastEvalTime: evaluationTime,
CurrentStateSince: evaluationTime.Add(-1 * time.Minute),
CurrentStateEnd: evaluationTime.Add(1 * time.Minute),
}
_ = dbstore.SaveAlertInstance(saveCmd2)
t.Cleanup(registry.ClearOverrides)
testCases := []struct {
desc string
evalResults []eval.Results
expectedStates map[string]*state.State
startingStateCount int
finalStateCount int
}{
{
desc: "stale cache entries are removed",
evalResults: []eval.Results{
{
eval.Result{
Instance: data.Labels{"test1": "testValue1"},
State: eval.Normal,
EvaluatedAt: evaluationTime.Add(3 * time.Minute),
},
},
},
expectedStates: map[string]*state.State{
`[["__alert_rule_namespace_uid__","namespace"],["__alert_rule_uid__","` + rule.UID + `"],["alertname","` + rule.Title + `"],["test1","testValue1"]]`: {
AlertRuleUID: rule.UID,
OrgID: 1,
CacheId: `[["__alert_rule_namespace_uid__","namespace"],["__alert_rule_uid__","` + rule.UID + `"],["alertname","` + rule.Title + `"],["test1","testValue1"]]`,
Labels: data.Labels{
"__alert_rule_namespace_uid__": "namespace",
"__alert_rule_uid__": rule.UID,
"alertname": rule.Title,
"test1": "testValue1",
},
State: eval.Normal,
Results: []state.Evaluation{
{
EvaluationTime: evaluationTime.Add(3 * time.Minute),
EvaluationState: eval.Normal,
Values: make(map[string]state.EvaluationValue),
},
},
LastEvaluationTime: evaluationTime.Add(3 * time.Minute),
EvaluationDuration: 0,
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
},
},
startingStateCount: 2,
finalStateCount: 1,
},
}
for _, tc := range testCases {
st := state.NewManager(log.New("test_stale_results_handler"), nilMetrics, dbstore, dbstore)
st.Warm()
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// We have loaded the expected number of entries from the db
assert.Equal(t, tc.startingStateCount, len(existingStatesForRule))
for _, res := range tc.evalResults {
st.ProcessEvalResults(rule, res)
for _, s := range tc.expectedStates {
cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
require.NoError(t, err)
assert.Equal(t, s, cachedState)
}
}
existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID)
// The expected number of state entries remains after results are processed
assert.Equal(t, tc.finalStateCount, len(existingStatesForRule))
}
}

View File

@ -14,6 +14,7 @@ type InstanceStore interface {
ListAlertInstances(cmd *models.ListAlertInstancesQuery) error ListAlertInstances(cmd *models.ListAlertInstancesQuery) error
SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error SaveAlertInstance(cmd *models.SaveAlertInstanceCommand) error
FetchOrgIds() ([]int64, error) FetchOrgIds() ([]int64, error)
DeleteAlertInstance(orgID int64, ruleUID, labelsHash string) error
} }
// GetAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and // GetAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and
@ -142,3 +143,13 @@ func (st DBstore) FetchOrgIds() ([]int64, error) {
return orgIds, err return orgIds, err
} }
func (st DBstore) DeleteAlertInstance(orgID int64, ruleUID, labelsHash string) error {
return st.SQLStore.WithTransactionalDbSession(context.Background(), func(sess *sqlstore.DBSession) error {
_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ? AND labels_hash = ?", orgID, ruleUID, labelsHash)
if err != nil {
return err
}
return nil
})
}