From 9f90a7b54d1421356c7ec9dbccc18fd5da17c0b0 Mon Sep 17 00:00:00 2001 From: Yuriy Tseretyan Date: Thu, 18 Aug 2022 09:40:33 -0400 Subject: [PATCH] Alerting: State manager to use InstanceStore (#53852) * move saving the state to state manager when scheduler stops * move saving state to ProcessEvalResults * add GetRuleKey to State * add LogContext to AlertRuleKey --- pkg/services/ngalert/eval/testing.go | 74 +++++++++++++++++++ pkg/services/ngalert/models/alert_rule.go | 4 + pkg/services/ngalert/schedule/schedule.go | 39 +--------- .../ngalert/schedule/schedule_unit_test.go | 15 ++-- pkg/services/ngalert/state/manager.go | 50 ++++++++++++- .../ngalert/state/manager_private_test.go | 45 +++++++++++ pkg/services/ngalert/state/manager_test.go | 29 ++++++++ pkg/services/ngalert/state/state.go | 7 ++ 8 files changed, 218 insertions(+), 45 deletions(-) create mode 100644 pkg/services/ngalert/eval/testing.go diff --git a/pkg/services/ngalert/eval/testing.go b/pkg/services/ngalert/eval/testing.go new file mode 100644 index 00000000000..ae36c097399 --- /dev/null +++ b/pkg/services/ngalert/eval/testing.go @@ -0,0 +1,74 @@ +package eval + +import ( + "fmt" + "math/rand" + "time" + + "github.com/grafana/grafana-plugin-sdk-go/data" + + "github.com/grafana/grafana/pkg/services/ngalert/models" +) + +type ResultMutator func(r *Result) + +func RandomState() State { + return []State{ + Normal, + Alerting, + NoData, + Error, + }[rand.Intn(4)] +} + +func GenerateResults(count int, generator func() Result) Results { + var result = make(Results, 0, count) + for i := 0; i < count; i++ { + result = append(result, generator()) + } + return result +} + +func ResultGen(mutators ...ResultMutator) func() Result { + return func() Result { + state := RandomState() + var err error + if state == Error { + err = fmt.Errorf("result_error") + } + result := Result{ + Instance: models.GenerateAlertLabels(rand.Intn(5)+1, "result_"), + State: state, + Error: err, + EvaluatedAt: time.Time{}, + EvaluationDuration: time.Duration(rand.Int63n(6)) * time.Second, + EvaluationString: "", + Values: nil, + } + for _, mutator := range mutators { + mutator(&result) + } + return result + } +} + +func WithEvaluatedAt(time time.Time) ResultMutator { + return func(r *Result) { + r.EvaluatedAt = time + } +} + +func WithState(state State) ResultMutator { + return func(r *Result) { + r.State = state + if state == Error { + r.Error = fmt.Errorf("with_state_error") + } + } +} + +func WithLabels(labels data.Labels) ResultMutator { + return func(r *Result) { + r.Instance = labels + } +} diff --git a/pkg/services/ngalert/models/alert_rule.go b/pkg/services/ngalert/models/alert_rule.go index 47e11b8a8e2..ec629847f2a 100644 --- a/pkg/services/ngalert/models/alert_rule.go +++ b/pkg/services/ngalert/models/alert_rule.go @@ -227,6 +227,10 @@ type AlertRuleKey struct { UID string `xorm:"uid"` } +func (k AlertRuleKey) LogContext() []interface{} { + return []interface{}{"rule_uid", k.UID, "org_id", k.OrgID} +} + type AlertRuleKeyWithVersion struct { Version int64 AlertRuleKey `xorm:"extends"` diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 12abf5bc80c..185ee91f66a 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -74,8 +74,7 @@ type schedule struct { evaluator eval.Evaluator - ruleStore store.RuleStore - instanceStore store.InstanceStore + ruleStore store.RuleStore stateManager *state.Manager @@ -123,7 +122,6 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager stopAppliedFunc: cfg.StopAppliedFunc, evaluator: cfg.Evaluator, ruleStore: cfg.RuleStore, - instanceStore: cfg.InstanceStore, metrics: cfg.Metrics, appURL: appURL, disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel), @@ -278,18 +276,10 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error { sch.metrics.SchedulePeriodicDuration.Observe(time.Since(start).Seconds()) case <-ctx.Done(): + // waiting for all rule evaluation routines to stop waitErr := dispatcherGroup.Wait() - - orgIds, err := sch.instanceStore.FetchOrgIds(ctx) - if err != nil { - sch.log.Error("unable to fetch orgIds", "msg", err.Error()) - } - - for _, v := range orgIds { - sch.saveAlertStates(ctx, sch.stateManager.GetAll(v)) - } - - sch.stateManager.Close() + // close the state manager and flush the state + sch.stateManager.Close(ctx) return waitErr } } @@ -329,7 +319,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR } processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, extraLabels) - sch.saveAlertStates(ctx, processedStates) alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL) if len(alerts.PostableAlerts) > 0 { sch.alertsSender.Send(key, alerts) @@ -414,26 +403,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR } } -func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State) { - sch.log.Debug("saving alert states", "count", len(states)) - for _, s := range states { - cmd := ngmodels.SaveAlertInstanceCommand{ - RuleOrgID: s.OrgID, - RuleUID: s.AlertRuleUID, - Labels: ngmodels.InstanceLabels(s.Labels), - State: ngmodels.InstanceStateType(s.State.String()), - StateReason: s.StateReason, - LastEvalTime: s.LastEvaluationTime, - CurrentStateSince: s.StartsAt, - CurrentStateEnd: s.EndsAt, - } - err := sch.instanceStore.SaveAlertInstance(ctx, &cmd) - if err != nil { - sch.log.Error("failed to save alert state", "uid", s.AlertRuleUID, "orgId", s.OrgID, "labels", s.Labels.String(), "state", s.State.String(), "msg", err.Error()) - } - } -} - // overrideCfg is only used on tests. func (sch *schedule) overrideCfg(cfg SchedulerCfg) { sch.clock = cfg.C diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index f2b015e4e72..ec1c3850579 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -516,14 +516,13 @@ func setupScheduler(t *testing.T, rs *store.FakeRuleStore, is *store.FakeInstanc } schedCfg := SchedulerCfg{ - Cfg: cfg, - C: mockedClock, - Evaluator: evaluator, - RuleStore: rs, - InstanceStore: is, - Logger: logger, - Metrics: m.GetSchedulerMetrics(), - AlertSender: senderMock, + Cfg: cfg, + C: mockedClock, + Evaluator: evaluator, + RuleStore: rs, + Logger: logger, + Metrics: m.GetSchedulerMetrics(), + AlertSender: senderMock, } st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, mockedClock) return NewScheduler(schedCfg, appUrl, st) diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 80a5ce99bff..d46fcfb1410 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -66,8 +66,9 @@ func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL, return manager } -func (st *Manager) Close() { +func (st *Manager) Close(ctx context.Context) { st.quit <- struct{}{} + st.flushState(ctx) } func (st *Manager) Warm(ctx context.Context) { @@ -161,7 +162,8 @@ func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) { // ProcessEvalResults updates the current states that belong to a rule with the evaluation results. // if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []*State { - st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results)) + logger := st.log.New(alertRule.GetKey().LogContext()) + logger.Debug("state manager processing evaluation results", "resultCount", len(results)) var states []*State processedResults := make(map[string]*State, len(results)) for _, result := range results { @@ -170,6 +172,14 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time processedResults[s.CacheId] = s } st.staleResultsHandler(ctx, evaluatedAt, alertRule, processedResults) + if len(states) > 0 { + logger.Debug("saving new states to the database", "count", len(states)) + for _, state := range states { + if err := st.saveState(ctx, state); err != nil { + logger.Error("failed to save alert state", "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error()) + } + } + } return states } @@ -297,6 +307,42 @@ func (st *Manager) Put(states []*State) { } } +// flushState dumps the entire state to the database +func (st *Manager) flushState(ctx context.Context) { + t := st.clock.Now() + st.log.Info("flushing the state") + st.cache.mtxStates.Lock() + defer st.cache.mtxStates.Unlock() + totalStates, errorsCnt := 0, 0 + for _, orgStates := range st.cache.states { + for _, ruleStates := range orgStates { + for _, state := range ruleStates { + err := st.saveState(ctx, state) + totalStates++ + if err != nil { + st.log.Error("failed to save alert state", append(state.GetRuleKey().LogContext(), "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error())) + errorsCnt++ + } + } + } + } + st.log.Info("the state has been flushed", "total_instances", totalStates, "errors", errorsCnt, "took", st.clock.Since(t)) +} + +func (st *Manager) saveState(ctx context.Context, s *State) error { + cmd := ngModels.SaveAlertInstanceCommand{ + RuleOrgID: s.OrgID, + RuleUID: s.AlertRuleUID, + Labels: ngModels.InstanceLabels(s.Labels), + State: ngModels.InstanceStateType(s.State.String()), + StateReason: s.StateReason, + LastEvalTime: s.LastEvaluationTime, + CurrentStateSince: s.StartsAt, + CurrentStateEnd: s.EndsAt, + } + return st.instanceStore.SaveAlertInstance(ctx, &cmd) +} + // TODO: why wouldn't you allow other types like NoData or Error? func translateInstanceState(state ngModels.InstanceStateType) eval.State { switch { diff --git a/pkg/services/ngalert/state/manager_private_test.go b/pkg/services/ngalert/state/manager_private_test.go index fed8a0b58c6..edee75ed404 100644 --- a/pkg/services/ngalert/state/manager_private_test.go +++ b/pkg/services/ngalert/state/manager_private_test.go @@ -7,13 +7,16 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/benbjohnson/clock" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/annotations" "github.com/grafana/grafana/pkg/services/dashboards" "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/services/ngalert/image" "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/store" @@ -149,3 +152,45 @@ func TestIsItStale(t *testing.T) { }) } } + +func TestClose(t *testing.T) { + instanceStore := &store.FakeInstanceStore{} + clk := clock.New() + st := NewManager(log.New("test_state_manager"), metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), nil, nil, instanceStore, &dashboards.FakeDashboardService{}, &image.NotAvailableImageService{}, clk) + fakeAnnoRepo := store.NewFakeAnnotationsRepo() + annotations.SetRepository(fakeAnnoRepo) + + _, rules := ngmodels.GenerateUniqueAlertRules(10, ngmodels.AlertRuleGen()) + for _, rule := range rules { + results := eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))) + _ = st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, ngmodels.GenerateAlertLabels(rand.Intn(4), "extra_")) + } + var states []*State + for _, org := range st.cache.states { + for _, rule := range org { + for _, state := range rule { + states = append(states, state) + } + } + } + + instanceStore.RecordedOps = nil + st.Close(context.Background()) + + t.Run("should flush the state to store", func(t *testing.T) { + savedStates := make(map[string]ngmodels.SaveAlertInstanceCommand) + for _, op := range instanceStore.RecordedOps { + switch q := op.(type) { + case ngmodels.SaveAlertInstanceCommand: + cacheId, err := q.Labels.StringKey() + require.NoError(t, err) + savedStates[cacheId] = q + } + } + + require.Len(t, savedStates, len(states)) + for _, s := range states { + require.Contains(t, savedStates, s.CacheId) + } + }) +} diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index e70729da09b..688947e6b94 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "sort" "testing" "time" @@ -2007,6 +2008,34 @@ func TestProcessEvalResults(t *testing.T) { }, time.Second, 100*time.Millisecond, "%d annotations are present, expected %d. We have %+v", fakeAnnoRepo.Len(), tc.expectedAnnotations, printAllAnnotations(fakeAnnoRepo.Items)) }) } + + t.Run("should save state to database", func(t *testing.T) { + fakeAnnoRepo := store.NewFakeAnnotationsRepo() + annotations.SetRepository(fakeAnnoRepo) + instanceStore := &store.FakeInstanceStore{} + clk := clock.New() + st := state.NewManager(log.New("test_state_manager"), testMetrics.GetStateMetrics(), nil, nil, instanceStore, &dashboards.FakeDashboardService{}, &image.NotAvailableImageService{}, clk) + rule := models.AlertRuleGen()() + var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now()))) + + states := st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, make(data.Labels)) + + require.NotEmpty(t, states) + + savedStates := make(map[string]models.SaveAlertInstanceCommand) + for _, op := range instanceStore.RecordedOps { + switch q := op.(type) { + case models.SaveAlertInstanceCommand: + cacheId, err := q.Labels.StringKey() + require.NoError(t, err) + savedStates[cacheId] = q + } + } + require.Len(t, savedStates, len(states)) + for _, s := range states { + require.Contains(t, savedStates, s.CacheId) + } + }) } func printAllAnnotations(annos []*annotations.Item) string { diff --git a/pkg/services/ngalert/state/state.go b/pkg/services/ngalert/state/state.go index d8bf35c1fc7..2bdf787d844 100644 --- a/pkg/services/ngalert/state/state.go +++ b/pkg/services/ngalert/state/state.go @@ -36,6 +36,13 @@ type State struct { Error error } +func (a *State) GetRuleKey() models.AlertRuleKey { + return models.AlertRuleKey{ + OrgID: a.OrgID, + UID: a.AlertRuleUID, + } +} + type Evaluation struct { EvaluationTime time.Time EvaluationState eval.State