mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: update state manager to return StateTransition instead of State (#58867)
* improve test for stale states * update state manager return StateTransition * update scheduler to accept state transitions
This commit is contained in:
@@ -158,6 +158,12 @@ func WithTitle(title string) AlertRuleMutator {
|
||||
}
|
||||
}
|
||||
|
||||
func WithFor(duration time.Duration) AlertRuleMutator {
|
||||
return func(rule *AlertRule) {
|
||||
rule.For = duration
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateAlertLabels(count int, prefix string) data.Labels {
|
||||
labels := make(data.Labels, count)
|
||||
for i := 0; i < count; i++ {
|
||||
|
||||
@@ -130,7 +130,7 @@ func errorAlert(labels, annotations data.Labels, alertState *state.State, urlStr
|
||||
}
|
||||
}
|
||||
|
||||
func FromAlertStateToPostableAlerts(firingStates []*state.State, stateManager *state.Manager, appURL *url.URL) apimodels.PostableAlerts {
|
||||
func FromStateTransitionToPostableAlerts(firingStates []state.StateTransition, stateManager *state.Manager, appURL *url.URL) apimodels.PostableAlerts {
|
||||
alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))}
|
||||
var sentAlerts []*state.State
|
||||
ts := time.Now()
|
||||
@@ -139,13 +139,13 @@ func FromAlertStateToPostableAlerts(firingStates []*state.State, stateManager *s
|
||||
if !alertState.NeedsSending(stateManager.ResendDelay) {
|
||||
continue
|
||||
}
|
||||
alert := stateToPostableAlert(alertState, appURL)
|
||||
alert := stateToPostableAlert(alertState.State, appURL)
|
||||
alerts.PostableAlerts = append(alerts.PostableAlerts, *alert)
|
||||
if alertState.StateReason == ngModels.StateReasonMissingSeries { // do not put stale state back to state manager
|
||||
continue
|
||||
}
|
||||
alertState.LastSentAt = ts
|
||||
sentAlerts = append(sentAlerts, alertState)
|
||||
sentAlerts = append(sentAlerts, alertState.State)
|
||||
}
|
||||
stateManager.Put(sentAlerts)
|
||||
return alerts
|
||||
|
||||
@@ -371,7 +371,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
return
|
||||
}
|
||||
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, sch.getRuleExtraLabels(e))
|
||||
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||
alerts := FromStateTransitionToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||
if len(alerts.PostableAlerts) > 0 {
|
||||
sch.alertsSender.Send(key, alerts)
|
||||
}
|
||||
|
||||
@@ -167,7 +167,7 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, ruleKey ngModels.Ale
|
||||
|
||||
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.
|
||||
// if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels
|
||||
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []*State {
|
||||
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []StateTransition {
|
||||
logger := st.log.FromContext(ctx)
|
||||
logger.Debug("State manager processing evaluation results", "resultCount", len(results))
|
||||
var states []StateTransition
|
||||
@@ -185,18 +185,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time
|
||||
if st.historian != nil {
|
||||
st.historian.RecordStatesAsync(ctx, alertRule, allChanges)
|
||||
}
|
||||
|
||||
nextStates := make([]*State, 0, len(states))
|
||||
for _, s := range states {
|
||||
nextStates = append(nextStates, s.State)
|
||||
}
|
||||
// TODO refactor further. Do not filter because it will be filtered downstream
|
||||
for _, s := range staleStates {
|
||||
if s.PreviousState == eval.Alerting {
|
||||
nextStates = append(nextStates, s.State)
|
||||
}
|
||||
}
|
||||
return nextStates
|
||||
return allChanges
|
||||
}
|
||||
|
||||
// Set the current state based on evaluation results
|
||||
|
||||
@@ -2309,7 +2309,18 @@ func TestStaleResults(t *testing.T) {
|
||||
return key
|
||||
}
|
||||
|
||||
checkExpectedStates := func(t *testing.T, actual []*state.State, expected map[string]struct{}) {
|
||||
checkExpectedStates := func(t *testing.T, actual []*state.State, expected map[string]struct{}) map[string]*state.State {
|
||||
t.Helper()
|
||||
result := make(map[string]*state.State)
|
||||
require.Len(t, actual, len(expected))
|
||||
for _, currentState := range actual {
|
||||
_, ok := expected[currentState.CacheID]
|
||||
result[currentState.CacheID] = currentState
|
||||
require.Truef(t, ok, "State %s is not expected. States: %v", currentState.CacheID, expected)
|
||||
}
|
||||
return result
|
||||
}
|
||||
checkExpectedStateTransitions := func(t *testing.T, actual []state.StateTransition, expected map[string]struct{}) {
|
||||
t.Helper()
|
||||
require.Len(t, actual, len(expected))
|
||||
for _, currentState := range actual {
|
||||
@@ -2318,75 +2329,83 @@ func TestStaleResults(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
clk := clock.NewMock()
|
||||
|
||||
store := &state.FakeInstanceStore{}
|
||||
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, store, &state.NoopImageService{}, clk, &state.FakeHistorian{})
|
||||
|
||||
rule := models.AlertRuleGen(models.WithFor(0))()
|
||||
|
||||
initResults := eval.Results{
|
||||
eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))(),
|
||||
eval.ResultGen(eval.WithState(eval.Alerting), eval.WithEvaluatedAt(clk.Now()))(),
|
||||
eval.ResultGen(eval.WithState(eval.Normal), eval.WithEvaluatedAt(clk.Now()))(),
|
||||
}
|
||||
|
||||
state1 := getCacheID(t, rule, initResults[0])
|
||||
state2 := getCacheID(t, rule, initResults[1])
|
||||
state3 := getCacheID(t, rule, initResults[2])
|
||||
|
||||
initStates := map[string]struct{}{
|
||||
state1: {},
|
||||
state2: {},
|
||||
state3: {},
|
||||
}
|
||||
|
||||
// Init
|
||||
processed := st.ProcessEvalResults(ctx, clk.Now(), rule, initResults, nil)
|
||||
checkExpectedStateTransitions(t, processed, initStates)
|
||||
|
||||
currentStates := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
statesMap := checkExpectedStates(t, currentStates, initStates)
|
||||
require.Equal(t, eval.Alerting, statesMap[state2].State) // make sure the state is alerting because we need it to be resolved later
|
||||
|
||||
staleDuration := 2 * time.Duration(rule.IntervalSeconds) * time.Second
|
||||
clk.Add(staleDuration)
|
||||
result := initResults[0]
|
||||
result.EvaluatedAt = clk.Now()
|
||||
results := eval.Results{
|
||||
result,
|
||||
}
|
||||
|
||||
var expectedStaleKeys []models.AlertInstanceKey
|
||||
t.Run("should mark missing states as stale", func(t *testing.T) {
|
||||
// init
|
||||
ctx := context.Background()
|
||||
_, dbstore := tests.SetupTestEnv(t, 1)
|
||||
clk := clock.NewMock()
|
||||
clk.Set(time.Now())
|
||||
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &state.NoopImageService{}, clk, &state.FakeHistorian{})
|
||||
|
||||
orgID := rand.Int63()
|
||||
rule := tests.CreateTestAlertRule(t, ctx, dbstore, 10, orgID)
|
||||
|
||||
initResults := eval.Results{
|
||||
eval.Result{
|
||||
Instance: data.Labels{"test1": "testValue1"},
|
||||
State: eval.Alerting,
|
||||
EvaluatedAt: clk.Now(),
|
||||
},
|
||||
eval.Result{
|
||||
Instance: data.Labels{"test1": "testValue2"},
|
||||
State: eval.Alerting,
|
||||
EvaluatedAt: clk.Now(),
|
||||
},
|
||||
eval.Result{
|
||||
Instance: data.Labels{"test1": "testValue3"},
|
||||
State: eval.Normal,
|
||||
EvaluatedAt: clk.Now(),
|
||||
},
|
||||
}
|
||||
|
||||
initStates := map[string]struct{}{
|
||||
getCacheID(t, rule, initResults[0]): {},
|
||||
getCacheID(t, rule, initResults[1]): {},
|
||||
getCacheID(t, rule, initResults[2]): {},
|
||||
}
|
||||
|
||||
// Init
|
||||
processed := st.ProcessEvalResults(ctx, clk.Now(), rule, initResults, nil)
|
||||
checkExpectedStates(t, processed, initStates)
|
||||
currentStates := st.GetStatesForRuleUID(orgID, rule.UID)
|
||||
checkExpectedStates(t, currentStates, initStates)
|
||||
|
||||
staleDuration := 2 * time.Duration(rule.IntervalSeconds) * time.Second
|
||||
clk.Add(staleDuration)
|
||||
results := eval.Results{
|
||||
eval.Result{
|
||||
Instance: data.Labels{"test1": "testValue1"},
|
||||
State: eval.Alerting,
|
||||
EvaluatedAt: clk.Now(),
|
||||
},
|
||||
}
|
||||
clk.Add(time.Nanosecond) // we use time now when calculate stale states. Evaluation tick and real time are not the same. usually, difference is way greater than nanosecond.
|
||||
expectedStaleReturned := getCacheID(t, rule, initResults[1])
|
||||
processed = st.ProcessEvalResults(ctx, clk.Now(), rule, results, nil)
|
||||
checkExpectedStates(t, processed, map[string]struct{}{
|
||||
getCacheID(t, rule, results[0]): {},
|
||||
expectedStaleReturned: {},
|
||||
})
|
||||
checkExpectedStateTransitions(t, processed, initStates)
|
||||
for _, s := range processed {
|
||||
if s.CacheID == expectedStaleReturned {
|
||||
assert.Truef(t, s.Resolved, "Returned stale state should have Resolved set to true")
|
||||
assert.Equal(t, eval.Normal, s.State)
|
||||
assert.Equal(t, models.StateReasonMissingSeries, s.StateReason)
|
||||
break
|
||||
if s.CacheID == state1 {
|
||||
continue
|
||||
}
|
||||
assert.Equal(t, eval.Normal, s.State.State)
|
||||
assert.Equal(t, models.StateReasonMissingSeries, s.StateReason)
|
||||
assert.Equal(t, clk.Now(), s.EndsAt)
|
||||
if s.CacheID == state2 {
|
||||
assert.Truef(t, s.Resolved, "Returned stale state should have Resolved set to true")
|
||||
}
|
||||
key, err := s.GetAlertInstanceKey()
|
||||
require.NoError(t, err)
|
||||
expectedStaleKeys = append(expectedStaleKeys, key)
|
||||
}
|
||||
currentStates = st.GetStatesForRuleUID(orgID, rule.UID)
|
||||
})
|
||||
|
||||
t.Run("should remove stale states from cache", func(t *testing.T) {
|
||||
currentStates = st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
checkExpectedStates(t, currentStates, map[string]struct{}{
|
||||
getCacheID(t, rule, results[0]): {},
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("should delete stale states from the database", func(t *testing.T) {
|
||||
for _, op := range store.RecordedOps {
|
||||
switch q := op.(type) {
|
||||
case state.FakeInstanceStoreOp:
|
||||
keys, ok := q.Args[1].([]models.AlertInstanceKey)
|
||||
require.Truef(t, ok, "Failed to parse fake store operations")
|
||||
require.Len(t, keys, 2)
|
||||
require.EqualValues(t, expectedStaleKeys, keys)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,6 +15,11 @@ type FakeInstanceStore struct {
|
||||
RecordedOps []interface{}
|
||||
}
|
||||
|
||||
type FakeInstanceStoreOp struct {
|
||||
Name string
|
||||
Args []interface{}
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) ListAlertInstances(_ context.Context, q *models.ListAlertInstancesQuery) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
@@ -33,7 +38,15 @@ func (f *FakeInstanceStore) SaveAlertInstances(_ context.Context, q ...models.Al
|
||||
|
||||
func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { return []int64{}, nil }
|
||||
|
||||
func (f *FakeInstanceStore) DeleteAlertInstances(_ context.Context, _ ...models.AlertInstanceKey) error {
|
||||
func (f *FakeInstanceStore) DeleteAlertInstances(ctx context.Context, q ...models.AlertInstanceKey) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = append(f.RecordedOps, FakeInstanceStoreOp{
|
||||
Name: "DeleteAlertInstances", Args: []interface{}{
|
||||
ctx,
|
||||
q,
|
||||
},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user