Alerting: Refactor Run of the scheduler (#37157)

* Alerting: Refactor `Run` of the scheduler

A bit of a refactor to make the diff easier to read for supporting
external Alertmanagers.

We'll introduce another routine that checks the database for
configuration and spawns other routines accordingly.

* Block the wait.

* Fix test
This commit is contained in:
gotjosh 2021-07-27 11:52:59 +01:00 committed by GitHub
parent 0c804df763
commit 442a6677fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 122 deletions

View File

@ -110,14 +110,14 @@ func (ng *AlertNG) Init() error {
return nil
}
// Run starts the scheduler.
// Run starts the scheduler and Alertmanager.
func (ng *AlertNG) Run(ctx context.Context) error {
ng.Log.Debug("ngalert starting")
ng.stateManager.Warm()
children, subCtx := errgroup.WithContext(ctx)
children.Go(func() error {
return ng.schedule.Ticker(subCtx)
return ng.schedule.Run(subCtx)
})
children.Go(func() error {
return ng.Alertmanager.Run(subCtx)

View File

@ -25,7 +25,7 @@ var timeNow = time.Now
// ScheduleService handles scheduling
type ScheduleService interface {
Ticker(context.Context) error
Run(context.Context) error
Pause() error
Unpause() error
@ -41,6 +41,8 @@ type Notifier interface {
}
type schedule struct {
wg sync.WaitGroup
// base tick rate (fastest possible configured check)
baseInterval time.Duration
@ -68,9 +70,7 @@ type schedule struct {
evaluator eval.Evaluator
ruleStore store.RuleStore
instanceStore store.InstanceStore
dataService *tsdb.Service
stateManager *state.Manager
@ -120,30 +120,6 @@ func NewScheduler(cfg SchedulerCfg, dataService *tsdb.Service, appURL string, st
return &sch
}
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C
sch.baseInterval = cfg.BaseInterval
sch.heartbeat = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch.evalAppliedFunc = cfg.EvalAppliedFunc
sch.stopAppliedFunc = cfg.StopAppliedFunc
}
func (sch *schedule) evalApplied(alertDefKey models.AlertRuleKey, now time.Time) {
if sch.evalAppliedFunc == nil {
return
}
sch.evalAppliedFunc(alertDefKey, now)
}
func (sch *schedule) stopApplied(alertDefKey models.AlertRuleKey) {
if sch.stopAppliedFunc == nil {
return
}
sch.stopAppliedFunc(alertDefKey)
}
func (sch *schedule) Pause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
@ -162,6 +138,111 @@ func (sch *schedule) Unpause() error {
return nil
}
func (sch *schedule) Run(ctx context.Context) error {
sch.wg.Add(1)
go func() {
if err := sch.ruleEvaluationLoop(ctx); err != nil {
sch.log.Error("failure while running the rule evaluation loop", "err", err)
}
}()
sch.wg.Wait()
return nil
}
func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
defer sch.wg.Done()
dispatcherGroup, ctx := errgroup.WithContext(ctx)
for {
select {
case tick := <-sch.heartbeat.C:
tickNum := tick.Unix() / int64(sch.baseInterval.Seconds())
alertRules := sch.fetchAllDetails()
sch.log.Debug("alert rules fetched", "count", len(alertRules))
// registeredDefinitions is a map used for finding deleted alert rules
// initially it is assigned to all known alert rules from the previous cycle
// each alert rule found also in this cycle is removed
// so, at the end, the remaining registered alert rules are the deleted ones
registeredDefinitions := sch.registry.keyMap()
type readyToRunItem struct {
key models.AlertRuleKey
ruleInfo alertRuleInfo
}
readyToRun := make([]readyToRunItem, 0)
for _, item := range alertRules {
key := item.GetKey()
itemVersion := item.Version
newRoutine := !sch.registry.exists(key)
ruleInfo := sch.registry.getOrCreateInfo(key, itemVersion)
invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh)
})
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert rule updates
sch.log.Debug("alert rule with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", sch.baseInterval)
continue
}
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo})
}
// remove the alert rule from the registered alert rules
delete(registeredDefinitions, key)
}
var step int64 = 0
if len(readyToRun) > 0 {
step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun))
}
for i := range readyToRun {
item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() {
item.ruleInfo.evalCh <- &evalContext{now: tick, version: item.ruleInfo.version}
})
}
// unregister and stop routines of the deleted alert rules
for key := range registeredDefinitions {
ruleInfo, err := sch.registry.get(key)
if err != nil {
sch.log.Error("failed to get alert rule routine information", "err", err)
continue
}
ruleInfo.stopCh <- struct{}{}
sch.registry.del(key)
}
case <-ctx.Done():
waitErr := dispatcherGroup.Wait()
orgIds, err := sch.instanceStore.FetchOrgIds()
if err != nil {
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
}
for _, v := range orgIds {
sch.saveAlertStates(sch.stateManager.GetAll(v))
}
sch.stateManager.Close()
return waitErr
}
}
}
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
sch.log.Debug("alert rule routine started", "key", key)
@ -248,96 +329,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
}
}
func (sch *schedule) Ticker(grafanaCtx context.Context) error {
dispatcherGroup, ctx := errgroup.WithContext(grafanaCtx)
for {
select {
case tick := <-sch.heartbeat.C:
tickNum := tick.Unix() / int64(sch.baseInterval.Seconds())
alertRules := sch.fetchAllDetails()
sch.log.Debug("alert rules fetched", "count", len(alertRules))
// registeredDefinitions is a map used for finding deleted alert rules
// initially it is assigned to all known alert rules from the previous cycle
// each alert rule found also in this cycle is removed
// so, at the end, the remaining registered alert rules are the deleted ones
registeredDefinitions := sch.registry.keyMap()
type readyToRunItem struct {
key models.AlertRuleKey
ruleInfo alertRuleInfo
}
readyToRun := make([]readyToRunItem, 0)
for _, item := range alertRules {
key := item.GetKey()
itemVersion := item.Version
newRoutine := !sch.registry.exists(key)
ruleInfo := sch.registry.getOrCreateInfo(key, itemVersion)
invalidInterval := item.IntervalSeconds%int64(sch.baseInterval.Seconds()) != 0
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh)
})
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert rule updates
sch.log.Debug("alert rule with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", sch.baseInterval)
continue
}
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo})
}
// remove the alert rule from the registered alert rules
delete(registeredDefinitions, key)
}
var step int64 = 0
if len(readyToRun) > 0 {
step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun))
}
for i := range readyToRun {
item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() {
item.ruleInfo.evalCh <- &evalContext{now: tick, version: item.ruleInfo.version}
})
}
// unregister and stop routines of the deleted alert rules
for key := range registeredDefinitions {
ruleInfo, err := sch.registry.get(key)
if err != nil {
sch.log.Error("failed to get alert rule routine information", "err", err)
continue
}
ruleInfo.stopCh <- struct{}{}
sch.registry.del(key)
}
case <-grafanaCtx.Done():
waitErr := dispatcherGroup.Wait()
orgIds, err := sch.instanceStore.FetchOrgIds()
if err != nil {
sch.log.Error("unable to fetch orgIds", "msg", err.Error())
}
for _, v := range orgIds {
sch.saveAlertStates(sch.stateManager.GetAll(v))
}
sch.stateManager.Close()
return waitErr
}
}
}
func (sch *schedule) sendAlerts(alerts apimodels.PostableAlerts) error {
return sch.notifier.PutAlerts(alerts)
}
@ -445,3 +436,30 @@ type evalContext struct {
now time.Time
version int64
}
// overrideCfg is only used on tests.
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C
sch.baseInterval = cfg.BaseInterval
sch.heartbeat = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch.evalAppliedFunc = cfg.EvalAppliedFunc
sch.stopAppliedFunc = cfg.StopAppliedFunc
}
// evalApplied is only used on tests.
func (sch *schedule) evalApplied(alertDefKey models.AlertRuleKey, now time.Time) {
if sch.evalAppliedFunc == nil {
return
}
sch.evalAppliedFunc(alertDefKey, now)
}
// stopApplied is only used on tests.
func (sch *schedule) stopApplied(alertDefKey models.AlertRuleKey) {
if sch.stopAppliedFunc == nil {
return
}
sch.stopAppliedFunc(alertDefKey)
}

View File

@ -158,7 +158,7 @@ func TestAlertingTicker(t *testing.T) {
ctx := context.Background()
go func() {
err := sched.Ticker(ctx)
err := sched.Run(ctx)
require.NoError(t, err)
}()
runtime.Gosched()