Alerting: Update scheduler to get updates only from database (#64635)

* stop using the scheduler's Update and Delete methods all communication must be via the database
* update scheduler's registry to calculate diff before re-setting the cache
* update fetcher to return the diff generated by registry
* update processTick to update rule eval routine if the rule was updated and it is not going to be evaluated at this tick.
* remove references to the scheduler from api package
* remove unused methods in the scheduler
This commit is contained in:
Yuri Tseretyan
2023-03-14 18:02:51 -04:00
committed by GitHub
parent 10c809a00a
commit 85a954cd81
11 changed files with 202 additions and 298 deletions

View File

@@ -29,16 +29,10 @@ import (
// ScheduleService is an interface for a service that schedules the evaluation
// of alert rules.
//
//go:generate mockery --name ScheduleService --structname FakeScheduleService --inpackage --filename schedule_mock.go --unroll-variadic=False
type ScheduleService interface {
// Run the scheduler until the context is canceled or the scheduler returns
// an error. The scheduler is terminated when this function returns.
Run(context.Context) error
// UpdateAlertRule notifies scheduler that a rule has been changed
UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool)
// DeleteAlertRule notifies scheduler that rules have been deleted
DeleteAlertRule(keys ...ngmodels.AlertRuleKey)
}
// AlertsSender is an interface for a service that is responsible for sending notifications to the end-user.
@@ -148,17 +142,8 @@ func (sch *schedule) Run(ctx context.Context) error {
return nil
}
// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule
func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool) {
ruleInfo, err := sch.registry.get(key)
if err != nil {
return
}
ruleInfo.update(ruleVersionAndPauseStatus{ruleVersion(lastVersion), isPaused})
}
// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
func (sch *schedule) DeleteAlertRule(keys ...ngmodels.AlertRuleKey) {
// deleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
func (sch *schedule) deleteAlertRule(keys ...ngmodels.AlertRuleKey) {
for _, key := range keys {
// It can happen that the scheduler has deleted the alert rule before the
// Ruler API has called DeleteAlertRule. This can happen as requests to
@@ -230,12 +215,22 @@ func (sch *schedule) updateRulesMetrics(alertRules []*ngmodels.AlertRule) {
sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules)))
}
func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.Group, tick time.Time) ([]readyToRunItem, map[ngmodels.AlertRuleKey]struct{}) {
// TODO refactor to accept a callback for tests that will be called with things that are returned currently, and return nothing.
// Returns a slice of rules that were scheduled for evaluation, map of stopped rules, and a slice of updated rules
func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.Group, tick time.Time) ([]readyToRunItem, map[ngmodels.AlertRuleKey]struct{}, []ngmodels.AlertRuleKeyWithVersion) {
tickNum := tick.Unix() / int64(sch.baseInterval.Seconds())
if err := sch.updateSchedulableAlertRules(ctx); err != nil {
// update the local registry. If there was a difference between the previous state and the current new state, rulesDiff will contains keys of rules that were updated.
rulesDiff, err := sch.updateSchedulableAlertRules(ctx)
updated := rulesDiff.updated
if updated == nil { // make sure map is not nil
updated = map[ngmodels.AlertRuleKey]struct{}{}
}
if err != nil {
sch.log.Error("Failed to update alert rules", "error", err)
}
// this is the new current state. rulesDiff contains the previously existing rules that were different between this state and the previous state.
alertRules, folderTitles := sch.schedulableAlertRules.all()
// registeredDefinitions is a map used for finding deleted alert rules
@@ -247,6 +242,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
sch.updateRulesMetrics(alertRules)
readyToRun := make([]readyToRunItem, 0)
updatedRules := make([]ngmodels.AlertRuleKeyWithVersion, 0, len(updated)) // this is needed for tests only
missingFolder := make(map[string][]string)
for _, item := range alertRules {
key := item.GetKey()
@@ -274,7 +270,8 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
}
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
isReadyToRun := item.IntervalSeconds != 0 && tickNum%itemFrequency == 0
if isReadyToRun {
var folderTitle string
if !sch.disableGrafanaFolder {
title, ok := folderTitles[item.NamespaceUID]
@@ -290,6 +287,20 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
folderTitle: folderTitle,
}})
}
if _, isUpdated := updated[key]; isUpdated && !isReadyToRun {
// if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that
sch.log.Debug("Rule has been updated. Notifying evaluation routine", key.LogContext()...)
go func(ri *alertRuleInfo, rule *ngmodels.AlertRule) {
ri.update(ruleVersionAndPauseStatus{
Version: ruleVersion(rule.Version),
IsPaused: rule.IsPaused,
})
}(ruleInfo, item)
updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{
Version: item.Version,
AlertRuleKey: item.GetKey(),
})
}
// remove the alert rule from the registered alert rules
delete(registeredDefinitions, key)
@@ -327,8 +338,8 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
for key := range registeredDefinitions {
toDelete = append(toDelete, key)
}
sch.DeleteAlertRule(toDelete...)
return readyToRun, registeredDefinitions
sch.deleteAlertRule(toDelete...)
return readyToRun, registeredDefinitions, updatedRules
}
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error {