Alerting: State manager to use InstanceStore (#53852)

* move saving the state to state manager when scheduler stops
* move saving state to ProcessEvalResults

* add GetRuleKey to State
* add LogContext to AlertRuleKey
This commit is contained in:
Yuriy Tseretyan 2022-08-18 09:40:33 -04:00 committed by GitHub
parent 86de94cbfa
commit 9f90a7b54d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 218 additions and 45 deletions

View File

@ -0,0 +1,74 @@
package eval
import (
"fmt"
"math/rand"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
type ResultMutator func(r *Result)
func RandomState() State {
return []State{
Normal,
Alerting,
NoData,
Error,
}[rand.Intn(4)]
}
func GenerateResults(count int, generator func() Result) Results {
var result = make(Results, 0, count)
for i := 0; i < count; i++ {
result = append(result, generator())
}
return result
}
func ResultGen(mutators ...ResultMutator) func() Result {
return func() Result {
state := RandomState()
var err error
if state == Error {
err = fmt.Errorf("result_error")
}
result := Result{
Instance: models.GenerateAlertLabels(rand.Intn(5)+1, "result_"),
State: state,
Error: err,
EvaluatedAt: time.Time{},
EvaluationDuration: time.Duration(rand.Int63n(6)) * time.Second,
EvaluationString: "",
Values: nil,
}
for _, mutator := range mutators {
mutator(&result)
}
return result
}
}
func WithEvaluatedAt(time time.Time) ResultMutator {
return func(r *Result) {
r.EvaluatedAt = time
}
}
func WithState(state State) ResultMutator {
return func(r *Result) {
r.State = state
if state == Error {
r.Error = fmt.Errorf("with_state_error")
}
}
}
func WithLabels(labels data.Labels) ResultMutator {
return func(r *Result) {
r.Instance = labels
}
}

View File

@ -227,6 +227,10 @@ type AlertRuleKey struct {
UID string `xorm:"uid"`
}
func (k AlertRuleKey) LogContext() []interface{} {
return []interface{}{"rule_uid", k.UID, "org_id", k.OrgID}
}
type AlertRuleKeyWithVersion struct {
Version int64
AlertRuleKey `xorm:"extends"`

View File

@ -74,8 +74,7 @@ type schedule struct {
evaluator eval.Evaluator
ruleStore store.RuleStore
instanceStore store.InstanceStore
ruleStore store.RuleStore
stateManager *state.Manager
@ -123,7 +122,6 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager
stopAppliedFunc: cfg.StopAppliedFunc,
evaluator: cfg.Evaluator,
ruleStore: cfg.RuleStore,
instanceStore: cfg.InstanceStore,
metrics: cfg.Metrics,
appURL: appURL,
disableGrafanaFolder: cfg.Cfg.ReservedLabels.IsReservedLabelDisabled(ngmodels.FolderTitleLabel),
@ -278,18 +276,10 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
sch.metrics.SchedulePeriodicDuration.Observe(time.Since(start).Seconds())
case <-ctx.Done():
// waiting for all rule evaluation routines to stop
waitErr := dispatcherGroup.Wait()
orgIds, err := sch.instanceStore.FetchOrgIds(ctx)
if err != nil {
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
}
for _, v := range orgIds {
sch.saveAlertStates(ctx, sch.stateManager.GetAll(v))
}
sch.stateManager.Close()
// close the state manager and flush the state
sch.stateManager.Close(ctx)
return waitErr
}
}
@ -329,7 +319,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, extraLabels)
sch.saveAlertStates(ctx, processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
if len(alerts.PostableAlerts) > 0 {
sch.alertsSender.Send(key, alerts)
@ -414,26 +403,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
}
func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State) {
sch.log.Debug("saving alert states", "count", len(states))
for _, s := range states {
cmd := ngmodels.SaveAlertInstanceCommand{
RuleOrgID: s.OrgID,
RuleUID: s.AlertRuleUID,
Labels: ngmodels.InstanceLabels(s.Labels),
State: ngmodels.InstanceStateType(s.State.String()),
StateReason: s.StateReason,
LastEvalTime: s.LastEvaluationTime,
CurrentStateSince: s.StartsAt,
CurrentStateEnd: s.EndsAt,
}
err := sch.instanceStore.SaveAlertInstance(ctx, &cmd)
if err != nil {
sch.log.Error("failed to save alert state", "uid", s.AlertRuleUID, "orgId", s.OrgID, "labels", s.Labels.String(), "state", s.State.String(), "msg", err.Error())
}
}
}
// overrideCfg is only used on tests.
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C

View File

@ -516,14 +516,13 @@ func setupScheduler(t *testing.T, rs *store.FakeRuleStore, is *store.FakeInstanc
}
schedCfg := SchedulerCfg{
Cfg: cfg,
C: mockedClock,
Evaluator: evaluator,
RuleStore: rs,
InstanceStore: is,
Logger: logger,
Metrics: m.GetSchedulerMetrics(),
AlertSender: senderMock,
Cfg: cfg,
C: mockedClock,
Evaluator: evaluator,
RuleStore: rs,
Logger: logger,
Metrics: m.GetSchedulerMetrics(),
AlertSender: senderMock,
}
st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, mockedClock)
return NewScheduler(schedCfg, appUrl, st)

View File

@ -66,8 +66,9 @@ func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL,
return manager
}
func (st *Manager) Close() {
func (st *Manager) Close(ctx context.Context) {
st.quit <- struct{}{}
st.flushState(ctx)
}
func (st *Manager) Warm(ctx context.Context) {
@ -161,7 +162,8 @@ func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.
// if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []*State {
st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
logger := st.log.New(alertRule.GetKey().LogContext())
logger.Debug("state manager processing evaluation results", "resultCount", len(results))
var states []*State
processedResults := make(map[string]*State, len(results))
for _, result := range results {
@ -170,6 +172,14 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time
processedResults[s.CacheId] = s
}
st.staleResultsHandler(ctx, evaluatedAt, alertRule, processedResults)
if len(states) > 0 {
logger.Debug("saving new states to the database", "count", len(states))
for _, state := range states {
if err := st.saveState(ctx, state); err != nil {
logger.Error("failed to save alert state", "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error())
}
}
}
return states
}
@ -297,6 +307,42 @@ func (st *Manager) Put(states []*State) {
}
}
// flushState dumps the entire state to the database
func (st *Manager) flushState(ctx context.Context) {
t := st.clock.Now()
st.log.Info("flushing the state")
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
totalStates, errorsCnt := 0, 0
for _, orgStates := range st.cache.states {
for _, ruleStates := range orgStates {
for _, state := range ruleStates {
err := st.saveState(ctx, state)
totalStates++
if err != nil {
st.log.Error("failed to save alert state", append(state.GetRuleKey().LogContext(), "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error()))
errorsCnt++
}
}
}
}
st.log.Info("the state has been flushed", "total_instances", totalStates, "errors", errorsCnt, "took", st.clock.Since(t))
}
func (st *Manager) saveState(ctx context.Context, s *State) error {
cmd := ngModels.SaveAlertInstanceCommand{
RuleOrgID: s.OrgID,
RuleUID: s.AlertRuleUID,
Labels: ngModels.InstanceLabels(s.Labels),
State: ngModels.InstanceStateType(s.State.String()),
StateReason: s.StateReason,
LastEvalTime: s.LastEvaluationTime,
CurrentStateSince: s.StartsAt,
CurrentStateEnd: s.EndsAt,
}
return st.instanceStore.SaveAlertInstance(ctx, &cmd)
}
// TODO: why wouldn't you allow other types like NoData or Error?
func translateInstanceState(state ngModels.InstanceStateType) eval.State {
switch {

View File

@ -7,13 +7,16 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/image"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
@ -149,3 +152,45 @@ func TestIsItStale(t *testing.T) {
})
}
}
func TestClose(t *testing.T) {
instanceStore := &store.FakeInstanceStore{}
clk := clock.New()
st := NewManager(log.New("test_state_manager"), metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), nil, nil, instanceStore, &dashboards.FakeDashboardService{}, &image.NotAvailableImageService{}, clk)
fakeAnnoRepo := store.NewFakeAnnotationsRepo()
annotations.SetRepository(fakeAnnoRepo)
_, rules := ngmodels.GenerateUniqueAlertRules(10, ngmodels.AlertRuleGen())
for _, rule := range rules {
results := eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
_ = st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, ngmodels.GenerateAlertLabels(rand.Intn(4), "extra_"))
}
var states []*State
for _, org := range st.cache.states {
for _, rule := range org {
for _, state := range rule {
states = append(states, state)
}
}
}
instanceStore.RecordedOps = nil
st.Close(context.Background())
t.Run("should flush the state to store", func(t *testing.T) {
savedStates := make(map[string]ngmodels.SaveAlertInstanceCommand)
for _, op := range instanceStore.RecordedOps {
switch q := op.(type) {
case ngmodels.SaveAlertInstanceCommand:
cacheId, err := q.Labels.StringKey()
require.NoError(t, err)
savedStates[cacheId] = q
}
}
require.Len(t, savedStates, len(states))
for _, s := range states {
require.Contains(t, savedStates, s.CacheId)
}
})
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"testing"
"time"
@ -2007,6 +2008,34 @@ func TestProcessEvalResults(t *testing.T) {
}, time.Second, 100*time.Millisecond, "%d annotations are present, expected %d. We have %+v", fakeAnnoRepo.Len(), tc.expectedAnnotations, printAllAnnotations(fakeAnnoRepo.Items))
})
}
t.Run("should save state to database", func(t *testing.T) {
fakeAnnoRepo := store.NewFakeAnnotationsRepo()
annotations.SetRepository(fakeAnnoRepo)
instanceStore := &store.FakeInstanceStore{}
clk := clock.New()
st := state.NewManager(log.New("test_state_manager"), testMetrics.GetStateMetrics(), nil, nil, instanceStore, &dashboards.FakeDashboardService{}, &image.NotAvailableImageService{}, clk)
rule := models.AlertRuleGen()()
var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
states := st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, make(data.Labels))
require.NotEmpty(t, states)
savedStates := make(map[string]models.SaveAlertInstanceCommand)
for _, op := range instanceStore.RecordedOps {
switch q := op.(type) {
case models.SaveAlertInstanceCommand:
cacheId, err := q.Labels.StringKey()
require.NoError(t, err)
savedStates[cacheId] = q
}
}
require.Len(t, savedStates, len(states))
for _, s := range states {
require.Contains(t, savedStates, s.CacheId)
}
})
}
func printAllAnnotations(annos []*annotations.Item) string {

View File

@ -36,6 +36,13 @@ type State struct {
Error error
}
func (a *State) GetRuleKey() models.AlertRuleKey {
return models.AlertRuleKey{
OrgID: a.OrgID,
UID: a.AlertRuleUID,
}
}
type Evaluation struct {
EvaluationTime time.Time
EvaluationState eval.State