Alerting: Update state manager to return StateTransitions when Delete or Reset (#62264)

* update Delete and Reset methods to return state transitions

this will be used by notifier code to decide whether alert needs to be sent or not.

* update scheduler to provide reason to delete states and use transitions

* update FromAlertsStateToStoppedAlert to accept StateTransition and filter by old state

* fixup

* fix tests
This commit is contained in:
Yuri Tseretyan 2023-01-27 03:46:21 -05:00 committed by GitHub
parent 6706f08ecd
commit 05bf241952
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 51 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
alertingModels "github.com/grafana/alerting/alerting/models"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/util/cmputil"
)
@ -111,6 +112,7 @@ const (
StateReasonError = "Error"
StateReasonPaused = "Paused"
StateReasonUpdated = "Updated"
StateReasonRuleDeleted = "RuleDeleted"
)
var (

View File

@ -15,6 +15,7 @@ import (
"github.com/prometheus/common/model"
alertingModels "github.com/grafana/alerting/alerting/models"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
@ -152,16 +153,16 @@ func FromStateTransitionToPostableAlerts(firingStates []state.StateTransition, s
return alerts
}
// FromAlertsStateToStoppedAlert converts firingStates that have evaluation state either eval.Alerting or eval.NoData or eval.Error to models.PostableAlert that are accepted by notifiers.
// Returns a list of alert instances that have expiration time.Now
func FromAlertsStateToStoppedAlert(firingStates []*state.State, appURL *url.URL, clock clock.Clock) apimodels.PostableAlerts {
// FromAlertsStateToStoppedAlert selects only transitions from firing states (states eval.Alerting, eval.NoData, eval.Error)
// and converts them to models.PostableAlert with EndsAt set to time.Now
func FromAlertsStateToStoppedAlert(firingStates []state.StateTransition, appURL *url.URL, clock clock.Clock) apimodels.PostableAlerts {
alerts := apimodels.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(firingStates))}
ts := clock.Now()
for _, alertState := range firingStates {
if alertState.State == eval.Normal || alertState.State == eval.Pending {
for _, transition := range firingStates {
if transition.PreviousState == eval.Normal || transition.PreviousState == eval.Pending {
continue
}
postableAlert := stateToPostableAlert(alertState, appURL)
postableAlert := stateToPostableAlert(transition.State, appURL)
postableAlert.EndsAt = strfmt.DateTime(ts)
alerts.PostableAlerts = append(alerts.PostableAlerts, *postableAlert)
}

View File

@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
alertingModels "github.com/grafana/alerting/alerting/models"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
@ -222,9 +223,14 @@ func Test_FromAlertsStateToStoppedAlert(t *testing.T) {
}
evalStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.Error, eval.NoData}
states := make([]*state.State, 0, len(evalStates))
for _, s := range evalStates {
states = append(states, randomState(s))
states := make([]state.StateTransition, 0, len(evalStates)*len(evalStates))
for _, to := range evalStates {
for _, from := range evalStates {
states = append(states, state.StateTransition{
State: randomState(to),
PreviousState: from,
})
}
}
clk := clock.NewMock()
@ -232,10 +238,10 @@ func Test_FromAlertsStateToStoppedAlert(t *testing.T) {
expected := make([]models.PostableAlert, 0, len(states))
for _, s := range states {
if !(s.State == eval.Alerting || s.State == eval.Error || s.State == eval.NoData) {
if !(s.PreviousState == eval.Alerting || s.PreviousState == eval.Error || s.PreviousState == eval.NoData) {
continue
}
alert := stateToPostableAlert(s, appURL)
alert := stateToPostableAlert(s.State, appURL)
alert.EndsAt = strfmt.DateTime(clk.Now())
expected = append(expected, *alert)
}

View File

@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/attribute"
alertingModels "github.com/grafana/alerting/alerting/models"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/datasources"
@ -324,7 +325,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
notify := func(states []*state.State) {
notify := func(states []state.StateTransition) {
expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
if len(expiredAlerts.PostableAlerts) > 0 {
sch.alertsSender.Send(key, expiredAlerts)
@ -508,7 +509,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
// cases.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key)
states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
notify(states)
}
logger.Debug("Stopping alert rule routine")

View File

@ -7,6 +7,7 @@ import (
"github.com/benbjohnson/clock"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
@ -167,14 +168,38 @@ func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) *State {
// DeleteStateByRuleUID removes the rule instances from cache and instanceStore. A closed channel is returned to be able
// to gracefully handle the clear state step in scheduler in case we do not need to use the historian to save state
// history.
func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State {
logger := st.log.New(ruleKey.LogContext()...)
func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey, reason string) []StateTransition {
logger := st.log.FromContext(ctx)
logger.Debug("Resetting state of the rule")
states := st.cache.removeByRuleUID(ruleKey.OrgID, ruleKey.UID)
if len(states) == 0 {
return states
return nil
}
now := st.clock.Now()
transitions := make([]StateTransition, 0, len(states))
for _, s := range states {
oldState := s.State
oldReason := s.StateReason
startsAt := s.StartsAt
if s.State != eval.Normal {
startsAt = now
}
s.SetNormal(reason, startsAt, now)
// Set Resolved property so the scheduler knows to send a postable alert
// to Alertmanager.
s.Resolved = oldState == eval.Alerting
s.LastEvaluationTime = now
s.Values = map[string]float64{}
transitions = append(transitions, StateTransition{
State: s,
PreviousState: oldState,
PreviousStateReason: oldReason,
})
}
if st.instanceStore != nil {
err := st.instanceStore.DeleteAlertInstancesByRule(ctx, ruleKey)
if err != nil {
@ -183,32 +208,17 @@ func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.Al
}
logger.Info("Rules state was reset", "states", len(states))
return states
return transitions
}
// ResetStateByRuleUID removes the rule instances from cache and instanceStore and saves state history. If the state
// history has to be saved, rule must not be nil.
func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.AlertRule, reason string) []*State {
func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.AlertRule, reason string) []StateTransition {
ruleKey := rule.GetKey()
states := st.DeleteStateByRuleUID(ctx, ruleKey)
transitions := st.DeleteStateByRuleUID(ctx, ruleKey, reason)
if rule == nil || st.historian == nil {
return states
}
transitions := make([]StateTransition, 0, len(states))
for _, s := range states {
oldState := s.State
oldReason := s.StateReason
state := *s
now := time.Now()
state.SetNormal(reason, s.StartsAt, now)
state.LastEvaluationTime = now
state.Values = map[string]float64{}
transitions = append(transitions, StateTransition{
State: &state,
PreviousState: oldState,
PreviousStateReason: oldReason,
})
if rule == nil || st.historian == nil || len(transitions) == 0 {
return transitions
}
ruleMeta := history_model.NewRuleMeta(rule, st.log)
@ -219,7 +229,7 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.Alert
st.log.FromContext(ctx).Error("Error updating historian state reset transitions", append(ruleKey.LogContext(), "reason", reason, "error", err)...)
}
}()
return states
return transitions
}
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.

View File

@ -27,6 +27,7 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/state/historian"
"github.com/grafana/grafana/pkg/services/ngalert/tests"
"github.com/grafana/grafana/pkg/util"
)
var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry())
@ -2623,12 +2624,14 @@ func TestDeleteStateByRuleUID(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx := context.Background()
clk := clock.NewMock()
clk.Set(time.Now())
cfg := state.ManagerCfg{
Metrics: testMetrics.GetStateMetrics(),
ExternalURL: nil,
InstanceStore: dbstore,
Images: &state.NoopImageService{},
Clock: clock.New(),
Clock: clk,
Historian: &state.FakeHistorian{},
}
st := state.NewManager(cfg)
@ -2641,12 +2644,28 @@ func TestDeleteStateByRuleUID(t *testing.T) {
assert.Equal(t, tc.startingStateCacheCount, len(existingStatesForRule))
assert.Equal(t, tc.startingInstanceDBCount, len(q.Result))
states := st.DeleteStateByRuleUID(ctx, rule.GetKey())
expectedReason := util.GenerateShortUID()
transitions := st.DeleteStateByRuleUID(ctx, rule.GetKey(), expectedReason)
// Check that the deleted states are the same as the ones that were in cache
assert.Equal(t, tc.startingStateCacheCount, len(states))
for _, s := range states {
assert.Equal(t, tc.expectedStates[s.CacheID], s)
assert.Equal(t, tc.startingStateCacheCount, len(transitions))
for _, s := range transitions {
assert.Contains(t, tc.expectedStates, s.CacheID)
oldState := tc.expectedStates[s.CacheID]
assert.Equal(t, oldState.State, s.PreviousState)
assert.Equal(t, oldState.StateReason, s.PreviousStateReason)
assert.Equal(t, eval.Normal, s.State.State)
assert.Equal(t, expectedReason, s.StateReason)
if oldState.State == eval.Normal {
assert.Equal(t, oldState.StartsAt, s.StartsAt)
assert.False(t, s.Resolved)
} else {
assert.Equal(t, clk.Now(), s.StartsAt)
if oldState.State == eval.Alerting {
assert.True(t, s.Resolved)
}
}
assert.Equal(t, clk.Now(), s.EndsAt)
}
q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
@ -2742,12 +2761,14 @@ func TestResetStateByRuleUID(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
ctx := context.Background()
fakeHistorian := &state.FakeHistorian{StateTransitions: make([]state.StateTransition, 0)}
clk := clock.NewMock()
clk.Set(time.Now())
cfg := state.ManagerCfg{
Metrics: testMetrics.GetStateMetrics(),
ExternalURL: nil,
InstanceStore: dbstore,
Images: &state.NoopImageService{},
Clock: clock.New(),
Clock: clk,
Historian: fakeHistorian,
}
st := state.NewManager(cfg)
@ -2760,20 +2781,32 @@ func TestResetStateByRuleUID(t *testing.T) {
assert.Equal(t, tc.startingStateCacheCount, len(existingStatesForRule))
assert.Equal(t, tc.startingInstanceDBCount, len(q.Result))
states := st.ResetStateByRuleUID(ctx, rule, models.StateReasonPaused)
transitions := st.ResetStateByRuleUID(ctx, rule, models.StateReasonPaused)
// Check that the deleted states are the same as the ones that were in cache
assert.Equal(t, tc.startingStateCacheCount, len(states))
for _, s := range states {
assert.Equal(t, tc.expectedStates[s.CacheID], s)
assert.Equal(t, tc.startingStateCacheCount, len(transitions))
for _, s := range transitions {
assert.Contains(t, tc.expectedStates, s.CacheID)
oldState := tc.expectedStates[s.CacheID]
assert.Equal(t, oldState.State, s.PreviousState)
assert.Equal(t, oldState.StateReason, s.PreviousStateReason)
assert.Equal(t, eval.Normal, s.State.State)
assert.Equal(t, models.StateReasonPaused, s.StateReason)
if oldState.State == eval.Normal {
assert.Equal(t, oldState.StartsAt, s.StartsAt)
assert.False(t, s.Resolved)
} else {
assert.Equal(t, clk.Now(), s.StartsAt)
if oldState.State == eval.Alerting {
assert.True(t, s.Resolved)
}
}
assert.Equal(t, clk.Now(), s.EndsAt)
}
// Check if both entries have been added to the historian
assert.Equal(t, tc.newHistorianEntriesCount, len(fakeHistorian.StateTransitions))
for _, str := range fakeHistorian.StateTransitions {
assert.Equal(t, tc.expectedStates[str.State.CacheID].State, str.PreviousState)
assert.Equal(t, tc.expectedStates[str.State.CacheID].StateReason, str.PreviousStateReason)
}
assert.Equal(t, transitions, fakeHistorian.StateTransitions)
q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
_ = dbstore.ListAlertInstances(ctx, q)