mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
feat(alerting): add state persister interface (#80384)
This commit is contained in:
parent
2d49fb6a7a
commit
82638d059f
@ -93,11 +93,10 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext,
|
|||||||
Images: &backtesting.NoopImageService{},
|
Images: &backtesting.NoopImageService{},
|
||||||
Clock: clock.New(),
|
Clock: clock.New(),
|
||||||
Historian: nil,
|
Historian: nil,
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: srv.tracer,
|
Tracer: srv.tracer,
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
manager := state.NewManager(cfg)
|
manager := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
includeFolder := !srv.cfg.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel)
|
includeFolder := !srv.cfg.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel)
|
||||||
transitions := manager.ProcessEvalResults(
|
transitions := manager.ProcessEvalResults(
|
||||||
c.Req.Context(),
|
c.Req.Context(),
|
||||||
|
@ -55,11 +55,10 @@ func NewEngine(appUrl *url.URL, evalFactory eval.EvaluatorFactory, tracer tracin
|
|||||||
Images: &NoopImageService{},
|
Images: &NoopImageService{},
|
||||||
Clock: clock.New(),
|
Clock: clock.New(),
|
||||||
Historian: nil,
|
Historian: nil,
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracer,
|
Tracer: tracer,
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
return state.NewManager(cfg)
|
return state.NewManager(cfg, state.NewNoopPersister())
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -290,12 +290,13 @@ func (ng *AlertNG) init() error {
|
|||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: history,
|
Historian: history,
|
||||||
DoNotSaveNormalState: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoNormalState),
|
DoNotSaveNormalState: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoNormalState),
|
||||||
MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency,
|
|
||||||
ApplyNoDataAndErrorToAllStates: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoDataErrorExecution),
|
ApplyNoDataAndErrorToAllStates: ng.FeatureToggles.IsEnabledGlobally(featuremgmt.FlagAlertingNoDataErrorExecution),
|
||||||
|
MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency,
|
||||||
Tracer: ng.tracer,
|
Tracer: ng.tracer,
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
stateManager := state.NewManager(cfg)
|
statePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.persist"), cfg)
|
||||||
|
stateManager := state.NewManager(cfg, statePersister)
|
||||||
scheduler := schedule.NewScheduler(schedCfg, stateManager)
|
scheduler := schedule.NewScheduler(schedCfg, stateManager)
|
||||||
|
|
||||||
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
|
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
|
||||||
|
@ -88,11 +88,10 @@ func TestProcessTicks(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: mockedClock,
|
Clock: mockedClock,
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: testTracer,
|
Tracer: testTracer,
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(managerCfg)
|
st := state.NewManager(managerCfg, state.NewNoopPersister())
|
||||||
|
|
||||||
sched := NewScheduler(schedCfg, st)
|
sched := NewScheduler(schedCfg, st)
|
||||||
|
|
||||||
@ -906,11 +905,12 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: mockedClock,
|
Clock: mockedClock,
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: testTracer,
|
Tracer: testTracer,
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
|
MaxStateSaveConcurrency: 1,
|
||||||
}
|
}
|
||||||
st := state.NewManager(managerCfg)
|
syncStatePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.perist"), managerCfg)
|
||||||
|
st := state.NewManager(managerCfg, syncStatePersister)
|
||||||
|
|
||||||
return NewScheduler(schedCfg, st)
|
return NewScheduler(schedCfg, st)
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
"github.com/benbjohnson/clock"
|
||||||
"github.com/grafana/dskit/concurrency"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
@ -30,6 +29,11 @@ type AlertInstanceManager interface {
|
|||||||
GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State
|
GetStatesForRuleUID(orgID int64, alertRuleUID string) []*State
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StatePersister interface {
|
||||||
|
Async(ctx context.Context, ticker *clock.Ticker, cache *cache)
|
||||||
|
Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition)
|
||||||
|
}
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
log log.Logger
|
log log.Logger
|
||||||
metrics *metrics.State
|
metrics *metrics.State
|
||||||
@ -45,8 +49,9 @@ type Manager struct {
|
|||||||
externalURL *url.URL
|
externalURL *url.URL
|
||||||
|
|
||||||
doNotSaveNormalState bool
|
doNotSaveNormalState bool
|
||||||
maxStateSaveConcurrency int
|
|
||||||
applyNoDataAndErrorToAllStates bool
|
applyNoDataAndErrorToAllStates bool
|
||||||
|
|
||||||
|
persister StatePersister
|
||||||
}
|
}
|
||||||
|
|
||||||
type ManagerCfg struct {
|
type ManagerCfg struct {
|
||||||
@ -60,7 +65,6 @@ type ManagerCfg struct {
|
|||||||
DoNotSaveNormalState bool
|
DoNotSaveNormalState bool
|
||||||
// MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
|
// MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
|
||||||
MaxStateSaveConcurrency int
|
MaxStateSaveConcurrency int
|
||||||
|
|
||||||
// ApplyNoDataAndErrorToAllStates makes state manager to apply exceptional results (NoData and Error)
|
// ApplyNoDataAndErrorToAllStates makes state manager to apply exceptional results (NoData and Error)
|
||||||
// to all states when corresponding execution in the rule definition is set to either `Alerting` or `OK`
|
// to all states when corresponding execution in the rule definition is set to either `Alerting` or `OK`
|
||||||
ApplyNoDataAndErrorToAllStates bool
|
ApplyNoDataAndErrorToAllStates bool
|
||||||
@ -69,7 +73,7 @@ type ManagerCfg struct {
|
|||||||
Log log.Logger
|
Log log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewManager(cfg ManagerCfg) *Manager {
|
func NewManager(cfg ManagerCfg, statePersister StatePersister) *Manager {
|
||||||
// Metrics for the cache use a collector, so they need access to the register directly.
|
// Metrics for the cache use a collector, so they need access to the register directly.
|
||||||
c := newCache()
|
c := newCache()
|
||||||
if cfg.Metrics != nil {
|
if cfg.Metrics != nil {
|
||||||
@ -87,8 +91,8 @@ func NewManager(cfg ManagerCfg) *Manager {
|
|||||||
clock: cfg.Clock,
|
clock: cfg.Clock,
|
||||||
externalURL: cfg.ExternalURL,
|
externalURL: cfg.ExternalURL,
|
||||||
doNotSaveNormalState: cfg.DoNotSaveNormalState,
|
doNotSaveNormalState: cfg.DoNotSaveNormalState,
|
||||||
maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency,
|
|
||||||
applyNoDataAndErrorToAllStates: cfg.ApplyNoDataAndErrorToAllStates,
|
applyNoDataAndErrorToAllStates: cfg.ApplyNoDataAndErrorToAllStates,
|
||||||
|
persister: statePersister,
|
||||||
tracer: cfg.Tracer,
|
tracer: cfg.Tracer,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,16 +283,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time
|
|||||||
))
|
))
|
||||||
|
|
||||||
staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule)
|
staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule)
|
||||||
st.deleteAlertStates(tracingCtx, logger, staleStates)
|
st.persister.Sync(tracingCtx, span, states, staleStates)
|
||||||
|
|
||||||
if len(staleStates) > 0 {
|
|
||||||
span.AddEvent("deleted stale states", trace.WithAttributes(
|
|
||||||
attribute.Int64("state_transitions", int64(len(staleStates))),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
st.saveAlertStates(tracingCtx, logger, states...)
|
|
||||||
span.AddEvent("updated database")
|
|
||||||
|
|
||||||
allChanges := append(states, staleStates...)
|
allChanges := append(states, staleStates...)
|
||||||
if st.historian != nil {
|
if st.historian != nil {
|
||||||
@ -442,72 +437,6 @@ func (st *Manager) Put(states []*State) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Is the `State` type necessary? Should it embed the instance?
|
|
||||||
func (st *Manager) saveAlertStates(ctx context.Context, logger log.Logger, states ...StateTransition) {
|
|
||||||
if st.instanceStore == nil || len(states) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
saveState := func(ctx context.Context, idx int) error {
|
|
||||||
s := states[idx]
|
|
||||||
// Do not save normal state to database and remove transition to Normal state but keep mapped states
|
|
||||||
if st.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
key, err := s.GetAlertInstanceKey()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String())
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
instance := ngModels.AlertInstance{
|
|
||||||
AlertInstanceKey: key,
|
|
||||||
Labels: ngModels.InstanceLabels(s.Labels),
|
|
||||||
CurrentState: ngModels.InstanceStateType(s.State.State.String()),
|
|
||||||
CurrentReason: s.StateReason,
|
|
||||||
LastEvalTime: s.LastEvaluationTime,
|
|
||||||
CurrentStateSince: s.StartsAt,
|
|
||||||
CurrentStateEnd: s.EndsAt,
|
|
||||||
ResultFingerprint: s.ResultFingerprint.String(),
|
|
||||||
}
|
|
||||||
|
|
||||||
err = st.instanceStore.SaveAlertInstance(ctx, instance)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
logger.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", st.maxStateSaveConcurrency)
|
|
||||||
_ = concurrency.ForEachJob(ctx, len(states), st.maxStateSaveConcurrency, saveState)
|
|
||||||
logger.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", st.maxStateSaveConcurrency, "duration", time.Since(start))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (st *Manager) deleteAlertStates(ctx context.Context, logger log.Logger, states []StateTransition) {
|
|
||||||
if st.instanceStore == nil || len(states) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Debug("Deleting alert states", "count", len(states))
|
|
||||||
toDelete := make([]ngModels.AlertInstanceKey, 0, len(states))
|
|
||||||
|
|
||||||
for _, s := range states {
|
|
||||||
key, err := s.GetAlertInstanceKey()
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Failed to delete alert instance with invalid labels", "cacheID", s.CacheID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
toDelete = append(toDelete, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := st.instanceStore.DeleteAlertInstances(ctx, toDelete...)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Failed to delete stale states", "error", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func translateInstanceState(state ngModels.InstanceStateType) eval.State {
|
func translateInstanceState(state ngModels.InstanceStateType) eval.State {
|
||||||
switch state {
|
switch state {
|
||||||
case ngModels.InstanceStateFiring:
|
case ngModels.InstanceStateFiring:
|
||||||
|
@ -27,11 +27,10 @@ func BenchmarkProcessEvalResults(b *testing.B) {
|
|||||||
hist := historian.NewAnnotationBackend(store, nil, metrics)
|
hist := historian.NewAnnotationBackend(store, nil, metrics)
|
||||||
cfg := state.ManagerCfg{
|
cfg := state.ManagerCfg{
|
||||||
Historian: hist,
|
Historian: hist,
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
sut := state.NewManager(cfg)
|
sut := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
rule := makeBenchRule()
|
rule := makeBenchRule()
|
||||||
results := makeBenchResults(100)
|
results := makeBenchResults(100)
|
||||||
|
@ -19,13 +19,10 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana/pkg/expr"
|
"github.com/grafana/grafana/pkg/expr"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/util"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Not for parallel tests.
|
// Not for parallel tests.
|
||||||
@ -82,84 +79,6 @@ func TestStateIsStale(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestManager_saveAlertStates(t *testing.T) {
|
|
||||||
type stateWithReason struct {
|
|
||||||
State eval.State
|
|
||||||
Reason string
|
|
||||||
}
|
|
||||||
create := func(s eval.State, r string) stateWithReason {
|
|
||||||
return stateWithReason{
|
|
||||||
State: s,
|
|
||||||
Reason: r,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
allStates := [...]stateWithReason{
|
|
||||||
create(eval.Normal, ""),
|
|
||||||
create(eval.Normal, eval.NoData.String()),
|
|
||||||
create(eval.Normal, eval.Error.String()),
|
|
||||||
create(eval.Normal, util.GenerateShortUID()),
|
|
||||||
create(eval.Alerting, ""),
|
|
||||||
create(eval.Pending, ""),
|
|
||||||
create(eval.NoData, ""),
|
|
||||||
create(eval.Error, ""),
|
|
||||||
}
|
|
||||||
|
|
||||||
transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{}
|
|
||||||
transitions := make([]StateTransition, 0)
|
|
||||||
for _, fromState := range allStates {
|
|
||||||
for i, toState := range allStates {
|
|
||||||
tr := StateTransition{
|
|
||||||
State: &State{
|
|
||||||
State: toState.State,
|
|
||||||
StateReason: toState.Reason,
|
|
||||||
Labels: ngmodels.GenerateAlertLabels(5, fmt.Sprintf("%d--", i)),
|
|
||||||
},
|
|
||||||
PreviousState: fromState.State,
|
|
||||||
PreviousStateReason: fromState.Reason,
|
|
||||||
}
|
|
||||||
key, err := tr.GetAlertInstanceKey()
|
|
||||||
require.NoError(t, err)
|
|
||||||
transitionToKey[key] = tr
|
|
||||||
transitions = append(transitions, tr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("should save all transitions if doNotSaveNormalState is false", func(t *testing.T) {
|
|
||||||
st := &FakeInstanceStore{}
|
|
||||||
m := Manager{instanceStore: st, doNotSaveNormalState: false, maxStateSaveConcurrency: 1}
|
|
||||||
m.saveAlertStates(context.Background(), &logtest.Fake{}, transitions...)
|
|
||||||
|
|
||||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
|
||||||
for _, op := range st.RecordedOps {
|
|
||||||
saved := op.(ngmodels.AlertInstance)
|
|
||||||
savedKeys[saved.AlertInstanceKey] = saved
|
|
||||||
}
|
|
||||||
assert.Len(t, transitionToKey, len(savedKeys))
|
|
||||||
|
|
||||||
for key, tr := range transitionToKey {
|
|
||||||
assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("should not save Normal->Normal if doNotSaveNormalState is true", func(t *testing.T) {
|
|
||||||
st := &FakeInstanceStore{}
|
|
||||||
m := Manager{instanceStore: st, doNotSaveNormalState: true, maxStateSaveConcurrency: 1}
|
|
||||||
m.saveAlertStates(context.Background(), &logtest.Fake{}, transitions...)
|
|
||||||
|
|
||||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
|
||||||
for _, op := range st.RecordedOps {
|
|
||||||
saved := op.(ngmodels.AlertInstance)
|
|
||||||
savedKeys[saved.AlertInstanceKey] = saved
|
|
||||||
}
|
|
||||||
for key, tr := range transitionToKey {
|
|
||||||
if tr.State.State == eval.Normal && tr.StateReason == "" && tr.PreviousState == eval.Normal && tr.PreviousStateReason == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestProcessEvalResults_StateTransitions tests how state.Manager's ProcessEvalResults processes results and creates or changes states.
|
// TestProcessEvalResults_StateTransitions tests how state.Manager's ProcessEvalResults processes results and creates or changes states.
|
||||||
// In other words, it tests the state transition.
|
// In other words, it tests the state transition.
|
||||||
//
|
//
|
||||||
@ -336,11 +255,10 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
|
|||||||
Images: &NotAvailableImageService{},
|
Images: &NotAvailableImageService{},
|
||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: &FakeHistorian{},
|
Historian: &FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
|
|
||||||
ApplyNoDataAndErrorToAllStates: applyNoDataErrorToAllStates,
|
ApplyNoDataAndErrorToAllStates: applyNoDataErrorToAllStates,
|
||||||
}
|
}
|
||||||
st := NewManager(cfg)
|
st := NewManager(cfg, NewNoopPersister())
|
||||||
|
|
||||||
tss := make([]time.Time, 0, len(resultsAtTime))
|
tss := make([]time.Time, 0, len(resultsAtTime))
|
||||||
for ts, results := range resultsAtTime {
|
for ts, results := range resultsAtTime {
|
||||||
|
@ -210,11 +210,10 @@ func TestWarmStateCache(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: clock.NewMock(),
|
Clock: clock.NewMock(),
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore)
|
||||||
|
|
||||||
t.Run("instance cache has expected entries", func(t *testing.T) {
|
t.Run("instance cache has expected entries", func(t *testing.T) {
|
||||||
@ -248,11 +247,10 @@ func TestDashboardAnnotations(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: clock.New(),
|
Clock: clock.New(),
|
||||||
Historian: hist,
|
Historian: hist,
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
|
|
||||||
const mainOrgID int64 = 1
|
const mainOrgID int64 = 1
|
||||||
|
|
||||||
@ -1263,11 +1261,10 @@ func TestProcessEvalResults(t *testing.T) {
|
|||||||
Images: &state.NotAvailableImageService{},
|
Images: &state.NotAvailableImageService{},
|
||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: hist,
|
Historian: hist,
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
|
|
||||||
evals := make([]time.Time, 0, len(tc.evalResults))
|
evals := make([]time.Time, 0, len(tc.evalResults))
|
||||||
for evalTime := range tc.evalResults {
|
for evalTime := range tc.evalResults {
|
||||||
@ -1364,11 +1361,12 @@ func TestProcessEvalResults(t *testing.T) {
|
|||||||
Images: &state.NotAvailableImageService{},
|
Images: &state.NotAvailableImageService{},
|
||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
|
MaxStateSaveConcurrency: 1,
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
statePersister := state.NewSyncStatePersisiter(log.New("ngalert.state.manager.persist"), cfg)
|
||||||
|
st := state.NewManager(cfg, statePersister)
|
||||||
rule := models.AlertRuleGen()()
|
rule := models.AlertRuleGen()()
|
||||||
var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
|
var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
|
||||||
|
|
||||||
@ -1517,11 +1515,10 @@ func TestStaleResultsHandler(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: clock.New(),
|
Clock: clock.New(),
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore)
|
||||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
|
||||||
@ -1600,11 +1597,10 @@ func TestStaleResults(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
|
|
||||||
rule := models.AlertRuleGen(models.WithFor(0))()
|
rule := models.AlertRuleGen(models.WithFor(0))()
|
||||||
|
|
||||||
@ -1774,11 +1770,10 @@ func TestDeleteStateByRuleUID(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: &state.FakeHistorian{},
|
Historian: &state.FakeHistorian{},
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore)
|
||||||
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||||
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
||||||
@ -1916,11 +1911,10 @@ func TestResetStateByRuleUID(t *testing.T) {
|
|||||||
Images: &state.NoopImageService{},
|
Images: &state.NoopImageService{},
|
||||||
Clock: clk,
|
Clock: clk,
|
||||||
Historian: fakeHistorian,
|
Historian: fakeHistorian,
|
||||||
MaxStateSaveConcurrency: 1,
|
|
||||||
Tracer: tracing.InitializeTracerForTest(),
|
Tracer: tracing.InitializeTracerForTest(),
|
||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg)
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore)
|
||||||
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||||
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
||||||
|
17
pkg/services/ngalert/state/persister_noop.go
Normal file
17
pkg/services/ngalert/state/persister_noop.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/benbjohnson/clock"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NoopPersister struct{}
|
||||||
|
|
||||||
|
func (n *NoopPersister) Async(_ context.Context, _ *clock.Ticker, _ *cache) {}
|
||||||
|
func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {}
|
||||||
|
|
||||||
|
func NewNoopPersister() StatePersister {
|
||||||
|
return &NoopPersister{}
|
||||||
|
}
|
111
pkg/services/ngalert/state/persister_sync.go
Normal file
111
pkg/services/ngalert/state/persister_sync.go
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/benbjohnson/clock"
|
||||||
|
"github.com/grafana/dskit/concurrency"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SyncStatePersister struct {
|
||||||
|
log log.Logger
|
||||||
|
store InstanceStore
|
||||||
|
// doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods.
|
||||||
|
doNotSaveNormalState bool
|
||||||
|
// maxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
|
||||||
|
maxStateSaveConcurrency int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister {
|
||||||
|
return &SyncStatePersister{
|
||||||
|
log: log,
|
||||||
|
store: cfg.InstanceStore,
|
||||||
|
doNotSaveNormalState: cfg.DoNotSaveNormalState,
|
||||||
|
maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *SyncStatePersister) Async(_ context.Context, _ *clock.Ticker, _ *cache) {
|
||||||
|
a.log.Debug("Async: No-Op")
|
||||||
|
}
|
||||||
|
func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, states, staleStates []StateTransition) {
|
||||||
|
a.deleteAlertStates(ctx, staleStates)
|
||||||
|
if len(staleStates) > 0 {
|
||||||
|
span.AddEvent("deleted stale states", trace.WithAttributes(
|
||||||
|
attribute.Int64("state_transitions", int64(len(staleStates))),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
a.saveAlertStates(ctx, states...)
|
||||||
|
span.AddEvent("updated database")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *SyncStatePersister) deleteAlertStates(ctx context.Context, states []StateTransition) {
|
||||||
|
if a.store == nil || len(states) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
a.log.Debug("Deleting alert states", "count", len(states))
|
||||||
|
toDelete := make([]ngModels.AlertInstanceKey, 0, len(states))
|
||||||
|
|
||||||
|
for _, s := range states {
|
||||||
|
key, err := s.GetAlertInstanceKey()
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error("Failed to delete alert instance with invalid labels", "cacheID", s.CacheID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
toDelete = append(toDelete, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := a.store.DeleteAlertInstances(ctx, toDelete...)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error("Failed to delete stale states", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *SyncStatePersister) saveAlertStates(ctx context.Context, states ...StateTransition) {
|
||||||
|
if a.store == nil || len(states) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
saveState := func(ctx context.Context, idx int) error {
|
||||||
|
s := states[idx]
|
||||||
|
// Do not save normal state to database and remove transition to Normal state but keep mapped states
|
||||||
|
if a.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := s.GetAlertInstanceKey()
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
instance := ngModels.AlertInstance{
|
||||||
|
AlertInstanceKey: key,
|
||||||
|
Labels: ngModels.InstanceLabels(s.Labels),
|
||||||
|
CurrentState: ngModels.InstanceStateType(s.State.State.String()),
|
||||||
|
CurrentReason: s.StateReason,
|
||||||
|
LastEvalTime: s.LastEvaluationTime,
|
||||||
|
CurrentStateSince: s.StartsAt,
|
||||||
|
CurrentStateEnd: s.EndsAt,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = a.store.SaveAlertInstance(ctx, instance)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
a.log.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency)
|
||||||
|
_ = concurrency.ForEachJob(ctx, len(states), a.maxStateSaveConcurrency, saveState)
|
||||||
|
a.log.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", a.maxStateSaveConcurrency, "duration", time.Since(start))
|
||||||
|
}
|
103
pkg/services/ngalert/state/persister_sync_test.go
Normal file
103
pkg/services/ngalert/state/persister_sync_test.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"k8s.io/component-base/tracing"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||||
|
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
|
"github.com/grafana/grafana/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSyncPersister_saveAlertStates(t *testing.T) {
|
||||||
|
type stateWithReason struct {
|
||||||
|
State eval.State
|
||||||
|
Reason string
|
||||||
|
}
|
||||||
|
create := func(s eval.State, r string) stateWithReason {
|
||||||
|
return stateWithReason{
|
||||||
|
State: s,
|
||||||
|
Reason: r,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
allStates := [...]stateWithReason{
|
||||||
|
create(eval.Normal, ""),
|
||||||
|
create(eval.Normal, eval.NoData.String()),
|
||||||
|
create(eval.Normal, eval.Error.String()),
|
||||||
|
create(eval.Normal, util.GenerateShortUID()),
|
||||||
|
create(eval.Alerting, ""),
|
||||||
|
create(eval.Pending, ""),
|
||||||
|
create(eval.NoData, ""),
|
||||||
|
create(eval.Error, ""),
|
||||||
|
}
|
||||||
|
|
||||||
|
transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{}
|
||||||
|
transitions := make([]StateTransition, 0)
|
||||||
|
for _, fromState := range allStates {
|
||||||
|
for i, toState := range allStates {
|
||||||
|
tr := StateTransition{
|
||||||
|
State: &State{
|
||||||
|
State: toState.State,
|
||||||
|
StateReason: toState.Reason,
|
||||||
|
Labels: ngmodels.GenerateAlertLabels(5, fmt.Sprintf("%d--", i)),
|
||||||
|
},
|
||||||
|
PreviousState: fromState.State,
|
||||||
|
PreviousStateReason: fromState.Reason,
|
||||||
|
}
|
||||||
|
key, err := tr.GetAlertInstanceKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
transitionToKey[key] = tr
|
||||||
|
transitions = append(transitions, tr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("should save all transitions if doNotSaveNormalState is false", func(t *testing.T) {
|
||||||
|
trace := tracing.NewNoopTracerProvider().Tracer("test")
|
||||||
|
_, span := trace.Start(context.Background(), "")
|
||||||
|
st := &FakeInstanceStore{}
|
||||||
|
syncStatePersister := NewSyncStatePersisiter(&logtest.Fake{}, ManagerCfg{
|
||||||
|
InstanceStore: st,
|
||||||
|
MaxStateSaveConcurrency: 1,
|
||||||
|
})
|
||||||
|
syncStatePersister.Sync(context.Background(), span, transitions, nil)
|
||||||
|
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||||
|
for _, op := range st.RecordedOps {
|
||||||
|
saved := op.(ngmodels.AlertInstance)
|
||||||
|
savedKeys[saved.AlertInstanceKey] = saved
|
||||||
|
}
|
||||||
|
assert.Len(t, transitionToKey, len(savedKeys))
|
||||||
|
|
||||||
|
for key, tr := range transitionToKey {
|
||||||
|
assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("should not save Normal->Normal if doNotSaveNormalState is true", func(t *testing.T) {
|
||||||
|
trace := tracing.NewNoopTracerProvider().Tracer("test")
|
||||||
|
_, span := trace.Start(context.Background(), "")
|
||||||
|
st := &FakeInstanceStore{}
|
||||||
|
syncStatePersister := NewSyncStatePersisiter(&logtest.Fake{}, ManagerCfg{
|
||||||
|
InstanceStore: st,
|
||||||
|
MaxStateSaveConcurrency: 1,
|
||||||
|
})
|
||||||
|
syncStatePersister.Sync(context.Background(), span, transitions, nil)
|
||||||
|
|
||||||
|
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||||
|
for _, op := range st.RecordedOps {
|
||||||
|
saved := op.(ngmodels.AlertInstance)
|
||||||
|
savedKeys[saved.AlertInstanceKey] = saved
|
||||||
|
}
|
||||||
|
for key, tr := range transitionToKey {
|
||||||
|
if tr.State.State == eval.Normal && tr.StateReason == "" && tr.PreviousState == eval.Normal && tr.PreviousStateReason == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
assert.Containsf(t, savedKeys, key, "state %s (%s) was not saved but should be", tr.State.State, tr.StateReason)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user