mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Refactor state manager Warm method to accept instance store as an argument (#95098)
This commit is contained in:
parent
cfc40b23d7
commit
d0481bb568
@ -552,7 +552,7 @@ func (ng *AlertNG) Run(ctx context.Context) error {
|
|||||||
// Also note that this runs synchronously to ensure state is loaded
|
// Also note that this runs synchronously to ensure state is loaded
|
||||||
// before rule evaluation begins, hence we use ctx and not subCtx.
|
// before rule evaluation begins, hence we use ctx and not subCtx.
|
||||||
//
|
//
|
||||||
ng.stateManager.Warm(ctx, ng.store)
|
ng.stateManager.Warm(ctx, ng.store, ng.store)
|
||||||
|
|
||||||
children.Go(func() error {
|
children.Go(func() error {
|
||||||
return ng.schedule.Run(subCtx)
|
return ng.schedule.Run(subCtx)
|
||||||
|
@ -123,7 +123,7 @@ func (st *Manager) Run(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) {
|
func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader, instanceReader InstanceReader) {
|
||||||
if st.instanceStore == nil {
|
if st.instanceStore == nil {
|
||||||
st.log.Info("Skip warming the state because instance store is not configured")
|
st.log.Info("Skip warming the state because instance store is not configured")
|
||||||
return
|
return
|
||||||
@ -131,7 +131,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) {
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
st.log.Info("Warming state cache for startup")
|
st.log.Info("Warming state cache for startup")
|
||||||
|
|
||||||
orgIds, err := st.instanceStore.FetchOrgIds(ctx)
|
orgIds, err := instanceReader.FetchOrgIds(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
st.log.Error("Unable to fetch orgIds", "error", err)
|
st.log.Error("Unable to fetch orgIds", "error", err)
|
||||||
}
|
}
|
||||||
@ -175,7 +175,7 @@ func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) {
|
|||||||
cmd := ngModels.ListAlertInstancesQuery{
|
cmd := ngModels.ListAlertInstancesQuery{
|
||||||
RuleOrgID: orgId,
|
RuleOrgID: orgId,
|
||||||
}
|
}
|
||||||
alertInstances, err := st.instanceStore.ListAlertInstances(ctx, &cmd)
|
alertInstances, err := instanceReader.ListAlertInstances(ctx, &cmd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
st.log.Error("Unable to fetch previous state", "error", err)
|
st.log.Error("Unable to fetch previous state", "error", err)
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ func TestWarmStateCache(t *testing.T) {
|
|||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg, state.NewNoopPersister())
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore, dbstore)
|
||||||
|
|
||||||
t.Run("instance cache has expected entries", func(t *testing.T) {
|
t.Run("instance cache has expected entries", func(t *testing.T) {
|
||||||
for _, entry := range expectedEntries {
|
for _, entry := range expectedEntries {
|
||||||
@ -277,7 +277,7 @@ func TestDashboardAnnotations(t *testing.T) {
|
|||||||
"test2": "{{ $labels.instance_label }}",
|
"test2": "{{ $labels.instance_label }}",
|
||||||
})
|
})
|
||||||
|
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore, dbstore)
|
||||||
bValue := float64(42)
|
bValue := float64(42)
|
||||||
cValue := float64(1)
|
cValue := float64(1)
|
||||||
_ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{
|
_ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{
|
||||||
@ -1813,7 +1813,7 @@ func TestStaleResultsHandler(t *testing.T) {
|
|||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg, state.NewNoopPersister())
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore, dbstore)
|
||||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
|
||||||
// We have loaded the expected number of entries from the db
|
// We have loaded the expected number of entries from the db
|
||||||
@ -2073,7 +2073,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
|
|||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg, state.NewNoopPersister())
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore, dbstore)
|
||||||
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||||
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
||||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
@ -2214,7 +2214,7 @@ func TestResetStateByRuleUID(t *testing.T) {
|
|||||||
Log: log.New("ngalert.state.manager"),
|
Log: log.New("ngalert.state.manager"),
|
||||||
}
|
}
|
||||||
st := state.NewManager(cfg, state.NewNoopPersister())
|
st := state.NewManager(cfg, state.NewNoopPersister())
|
||||||
st.Warm(ctx, dbstore)
|
st.Warm(ctx, dbstore, dbstore)
|
||||||
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||||
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
alerts, _ := dbstore.ListAlertInstances(ctx, q)
|
||||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||||
|
@ -9,8 +9,8 @@ import (
|
|||||||
|
|
||||||
// InstanceStore represents the ability to fetch and write alert instances.
|
// InstanceStore represents the ability to fetch and write alert instances.
|
||||||
type InstanceStore interface {
|
type InstanceStore interface {
|
||||||
FetchOrgIds(ctx context.Context) ([]int64, error)
|
InstanceReader
|
||||||
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error)
|
|
||||||
SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error
|
SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error
|
||||||
DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error
|
DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error
|
||||||
// SaveAlertInstancesForRule overwrites the state for the given rule.
|
// SaveAlertInstancesForRule overwrites the state for the given rule.
|
||||||
@ -19,6 +19,12 @@ type InstanceStore interface {
|
|||||||
FullSync(ctx context.Context, instances []models.AlertInstance) error
|
FullSync(ctx context.Context, instances []models.AlertInstance) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InstanceReader provides methods to fetch alert instances.
|
||||||
|
type InstanceReader interface {
|
||||||
|
FetchOrgIds(ctx context.Context) ([]int64, error)
|
||||||
|
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error)
|
||||||
|
}
|
||||||
|
|
||||||
// RuleReader represents the ability to fetch alert rules.
|
// RuleReader represents the ability to fetch alert rules.
|
||||||
type RuleReader interface {
|
type RuleReader interface {
|
||||||
ListAlertRules(ctx context.Context, query *models.ListAlertRulesQuery) (models.RulesGroup, error)
|
ListAlertRules(ctx context.Context, query *models.ListAlertRulesQuery) (models.RulesGroup, error)
|
||||||
|
Loading…
Reference in New Issue
Block a user