mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Update state manager to accept rule store as Warm method argument (#58244)
This commit is contained in:
parent
8f6cdd4cda
commit
dce8879145
@ -127,6 +127,7 @@ type AlertNG struct {
|
||||
accesscontrol accesscontrol.AccessControl
|
||||
accesscontrolService accesscontrol.Service
|
||||
annotationsRepo annotations.Repository
|
||||
store *store.DBstore
|
||||
|
||||
bus bus.Bus
|
||||
}
|
||||
@ -143,6 +144,7 @@ func (ng *AlertNG) init() error {
|
||||
AccessControl: ng.accesscontrol,
|
||||
DashboardService: ng.dashboardService,
|
||||
}
|
||||
ng.store = store
|
||||
|
||||
decryptFn := ng.SecretsService.GetDecryptedValue
|
||||
multiOrgMetrics := ng.Metrics.GetMultiOrgAlertmanagerMetrics()
|
||||
@ -191,7 +193,7 @@ func (ng *AlertNG) init() error {
|
||||
}
|
||||
|
||||
historian := historian.NewAnnotationHistorian(ng.annotationsRepo, ng.dashboardService)
|
||||
stateManager := state.NewManager(ng.Metrics.GetStateMetrics(), appUrl, store, store, ng.imageService, clk, historian)
|
||||
stateManager := state.NewManager(ng.Metrics.GetStateMetrics(), appUrl, store, ng.imageService, clk, historian)
|
||||
scheduler := schedule.NewScheduler(schedCfg, appUrl, stateManager)
|
||||
|
||||
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
|
||||
@ -276,7 +278,7 @@ func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleSt
|
||||
// Run starts the scheduler and Alertmanager.
|
||||
func (ng *AlertNG) Run(ctx context.Context) error {
|
||||
ng.Log.Debug("Starting")
|
||||
ng.stateManager.Warm(ctx)
|
||||
ng.stateManager.Warm(ctx, ng.store)
|
||||
|
||||
children, subCtx := errgroup.WithContext(ctx)
|
||||
|
||||
|
@ -106,8 +106,8 @@ func TestWarmStateCache(t *testing.T) {
|
||||
Labels: labels,
|
||||
}
|
||||
_ = dbstore.SaveAlertInstances(ctx, instance2)
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{})
|
||||
st.Warm(ctx)
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{})
|
||||
st.Warm(ctx, dbstore)
|
||||
|
||||
t.Run("instance cache has expected entries", func(t *testing.T) {
|
||||
for _, entry := range expectedEntries {
|
||||
@ -157,7 +157,7 @@ func TestAlertingTicker(t *testing.T) {
|
||||
Metrics: testMetrics.GetSchedulerMetrics(),
|
||||
AlertSender: notifier,
|
||||
}
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{})
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.NewMock(), &state.FakeHistorian{})
|
||||
appUrl := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "localhost",
|
||||
|
@ -524,8 +524,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
AlertSender: senderMock,
|
||||
}
|
||||
|
||||
stateRs := state.FakeRuleReader{}
|
||||
st := state.NewManager(m.GetStateMetrics(), nil, &stateRs, is, &image.NoopImageService{}, mockedClock, &state.FakeHistorian{})
|
||||
st := state.NewManager(m.GetStateMetrics(), nil, is, &image.NoopImageService{}, mockedClock, &state.FakeHistorian{})
|
||||
return NewScheduler(schedCfg, appUrl, st)
|
||||
}
|
||||
|
||||
|
@ -32,22 +32,19 @@ type Manager struct {
|
||||
quit chan struct{}
|
||||
ResendDelay time.Duration
|
||||
|
||||
ruleStore RuleReader
|
||||
instanceStore InstanceStore
|
||||
imageService image.ImageService
|
||||
historian Historian
|
||||
externalURL *url.URL
|
||||
}
|
||||
|
||||
func NewManager(metrics *metrics.State, externalURL *url.URL,
|
||||
ruleStore RuleReader, instanceStore InstanceStore, imageService image.ImageService, clock clock.Clock, historian Historian) *Manager {
|
||||
func NewManager(metrics *metrics.State, externalURL *url.URL, instanceStore InstanceStore, imageService image.ImageService, clock clock.Clock, historian Historian) *Manager {
|
||||
manager := &Manager{
|
||||
cache: newCache(),
|
||||
quit: make(chan struct{}),
|
||||
ResendDelay: ResendDelay, // TODO: make this configurable
|
||||
log: log.New("ngalert.state.manager"),
|
||||
metrics: metrics,
|
||||
ruleStore: ruleStore,
|
||||
instanceStore: instanceStore,
|
||||
imageService: imageService,
|
||||
historian: historian,
|
||||
@ -64,12 +61,10 @@ func (st *Manager) Close() {
|
||||
st.quit <- struct{}{}
|
||||
}
|
||||
|
||||
func (st *Manager) Warm(ctx context.Context) {
|
||||
func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) {
|
||||
if st.instanceStore == nil {
|
||||
st.log.Info("Skip warming the state because instance store is not configured")
|
||||
}
|
||||
if st.ruleStore == nil {
|
||||
st.log.Info("Skip warming the state because rule store is not configured")
|
||||
return
|
||||
}
|
||||
startTime := time.Now()
|
||||
st.log.Info("Warming state cache for startup")
|
||||
@ -86,7 +81,7 @@ func (st *Manager) Warm(ctx context.Context) {
|
||||
ruleCmd := ngModels.ListAlertRulesQuery{
|
||||
OrgID: orgId,
|
||||
}
|
||||
if err := st.ruleStore.ListAlertRules(ctx, &ruleCmd); err != nil {
|
||||
if err := rulesReader.ListAlertRules(ctx, &ruleCmd); err != nil {
|
||||
st.log.Error("Unable to fetch previous state", "error", err)
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ func TestDashboardAnnotations(t *testing.T) {
|
||||
|
||||
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
|
||||
hist := historian.NewAnnotationHistorian(fakeAnnoRepo, &dashboards.FakeDashboardService{})
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, dbstore, &image.NoopImageService{}, clock.New(), hist)
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.New(), hist)
|
||||
|
||||
const mainOrgID int64 = 1
|
||||
|
||||
@ -48,7 +48,7 @@ func TestDashboardAnnotations(t *testing.T) {
|
||||
"test2": "{{ $labels.instance_label }}",
|
||||
})
|
||||
|
||||
st.Warm(ctx)
|
||||
st.Warm(ctx, dbstore)
|
||||
bValue := float64(42)
|
||||
cValue := float64(1)
|
||||
_ = st.ProcessEvalResults(ctx, evaluationTime, rule, eval.Results{{
|
||||
@ -2020,7 +2020,7 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
|
||||
hist := historian.NewAnnotationHistorian(fakeAnnoRepo, &dashboards.FakeDashboardService{})
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, nil, &state.FakeInstanceStore{}, &image.NotAvailableImageService{}, clock.New(), hist)
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, &state.FakeInstanceStore{}, &image.NotAvailableImageService{}, clock.New(), hist)
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
for _, res := range tc.evalResults {
|
||||
_ = st.ProcessEvalResults(context.Background(), evaluationTime, tc.alertRule, res, data.Labels{
|
||||
@ -2047,7 +2047,7 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
t.Run("should save state to database", func(t *testing.T) {
|
||||
instanceStore := &state.FakeInstanceStore{}
|
||||
clk := clock.New()
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, nil, instanceStore, &image.NotAvailableImageService{}, clk, &state.FakeHistorian{})
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, instanceStore, &image.NotAvailableImageService{}, clk, &state.FakeHistorian{})
|
||||
rule := models.AlertRuleGen()()
|
||||
var results = eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
|
||||
|
||||
@ -2176,8 +2176,8 @@ func TestStaleResultsHandler(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
ctx := context.Background()
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, dbstore, &image.NoopImageService{}, clock.New(), &state.FakeHistorian{})
|
||||
st.Warm(ctx)
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clock.New(), &state.FakeHistorian{})
|
||||
st.Warm(ctx, dbstore)
|
||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
|
||||
// We have loaded the expected number of entries from the db
|
||||
@ -2238,7 +2238,7 @@ func TestStaleResults(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
clk.Set(time.Now())
|
||||
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, dbstore, &image.NoopImageService{}, clk, &state.FakeHistorian{})
|
||||
st := state.NewManager(testMetrics.GetStateMetrics(), nil, dbstore, &image.NoopImageService{}, clk, &state.FakeHistorian{})
|
||||
|
||||
orgID := rand.Int63()
|
||||
rule := tests.CreateTestAlertRule(t, ctx, dbstore, 10, orgID)
|
||||
|
Loading…
Reference in New Issue
Block a user