diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 37c169130fb..fae7c64512e 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -552,7 +552,7 @@ func (ng *AlertNG) Run(ctx context.Context) error { // Also note that this runs synchronously to ensure state is loaded // before rule evaluation begins, hence we use ctx and not subCtx. // - ng.stateManager.Warm(ctx, ng.store) + ng.stateManager.Warm(ctx, ng.store, ng.store) children.Go(func() error { return ng.schedule.Run(subCtx) diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index b351f6c274f..666fd65de50 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -123,7 +123,7 @@ func (st *Manager) Run(ctx context.Context) error { return nil } -func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) { +func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceReader InstanceReader) { if st.instanceStore == nil { st.log.Info("Skip warming the state because instance store is not configured") return @@ -131,7 +131,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) { startTime := time.Now() st.log.Info("Warming state cache for startup") - orgIds, err := st.instanceStore.FetchOrgIds(ctx) + orgIds, err := instanceReader.FetchOrgIds(ctx) if err != nil { st.log.Error("Unable to fetch orgIds", "error", err) } @@ -175,7 +175,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) { cmd := ngModels.ListAlertInstancesQuery{ RuleOrgID: orgId, } - alertInstances, err := st.instanceStore.ListAlertInstances(ctx, &cmd) + alertInstances, err := instanceReader.ListAlertInstances(ctx, &cmd) if err != nil { st.log.Error("Unable to fetch previous state", "error", err) } diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index a46e5357fe5..fc5999e3762 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -230,7 +230,7 @@ func TestWarmStateCache(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore) + st.Warm(ctx, dbstore, dbstore) t.Run("instance cache has expected entries", func(t *testing.T) { for _, entry := range expectedEntries { @@ -277,7 +277,7 @@ func TestDashboardAnnotations(t *testing.T) { "test2": "{{ $labels.instance_label }}", }) - st.Warm(ctx, dbstore) + st.Warm(ctx, dbstore, dbstore) bValue := float64(42) cValue := float64(1) _ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{ @@ -1813,7 +1813,7 @@ func TestStaleResultsHandler(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore) + st.Warm(ctx, dbstore, dbstore) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) // We have loaded the expected number of entries from the db @@ -2073,7 +2073,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore) + st.Warm(ctx, dbstore, dbstore) q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} alerts, _ := dbstore.ListAlertInstances(ctx, q) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) @@ -2214,7 +2214,7 @@ func TestResetStateByRuleUID(t *testing.T) { Log: log.New("ngalert.state.manager"), } st := state.NewManager(cfg, state.NewNoopPersister()) - st.Warm(ctx, dbstore) + st.Warm(ctx, dbstore, dbstore) q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID} alerts, _ := dbstore.ListAlertInstances(ctx, q) existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID) diff --git a/pkg/services/ngalert/state/persist.go b/pkg/services/ngalert/state/persist.go index a6d9a74d500..5249a2f1ee5 100644 --- a/pkg/services/ngalert/state/persist.go +++ b/pkg/services/ngalert/state/persist.go @@ -9,8 +9,8 @@ import ( // InstanceStore represents the ability to fetch and write alert instances. type InstanceStore interface { - FetchOrgIds(ctx context.Context) ([]int64, error) - ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) + InstanceReader + SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error // SaveAlertInstancesForRule overwrites the state for the given rule. @@ -19,6 +19,12 @@ type InstanceStore interface { FullSync(ctx context.Context, instances []models.AlertInstance) error } +// InstanceReader provides methods to fetch alert instances. +type InstanceReader interface { + FetchOrgIds(ctx context.Context) ([]int64, error) + ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) +} + // RuleReader represents the ability to fetch alert rules. type RuleReader interface { ListAlertRules(ctx context.Context, query *models.ListAlertRulesQuery) (models.RulesGroup, error)