mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Fix alert definition routine stop (#30117)
This commit is contained in:
parent
638cf642fa
commit
551428af61
@ -13,7 +13,7 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext) error {
|
||||
func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefinitionKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
|
||||
ng.log.Debug("alert definition routine started", "key", key)
|
||||
|
||||
evalRunning := false
|
||||
@ -75,12 +75,13 @@ func (ng *AlertNG) definitionRoutine(grafanaCtx context.Context, key alertDefini
|
||||
}
|
||||
}
|
||||
}()
|
||||
case k := <-ng.schedule.stop:
|
||||
if k == key {
|
||||
case <-stopCh:
|
||||
if ng.schedule.stopApplied != nil {
|
||||
ng.schedule.stopApplied(key)
|
||||
}
|
||||
ng.schedule.log.Debug("stopping alert definition routine", "key", key)
|
||||
// interrupt evaluation if it's running
|
||||
return nil
|
||||
}
|
||||
case <-grafanaCtx.Done():
|
||||
return grafanaCtx.Err()
|
||||
}
|
||||
@ -94,9 +95,6 @@ type schedule struct {
|
||||
// each alert definition gets its own channel and routine
|
||||
registry alertDefinitionRegistry
|
||||
|
||||
// broadcast channel for stopping definition routines
|
||||
stop chan alertDefinitionKey
|
||||
|
||||
maxAttempts int64
|
||||
|
||||
clock clock.Clock
|
||||
@ -108,6 +106,11 @@ type schedule struct {
|
||||
// message from evalApplied is handled.
|
||||
evalApplied func(alertDefinitionKey, time.Time)
|
||||
|
||||
// stopApplied is only used for tests: test code can set it to non-nil
|
||||
// function, and then it'll be called from the event loop whenever the
|
||||
// message from stopApplied is handled.
|
||||
stopApplied func(alertDefinitionKey)
|
||||
|
||||
log log.Logger
|
||||
}
|
||||
|
||||
@ -116,7 +119,6 @@ func newScheduler(c clock.Clock, baseInterval time.Duration, logger log.Logger,
|
||||
ticker := alerting.NewTicker(c.Now(), time.Second*0, c, int64(baseInterval.Seconds()))
|
||||
sch := schedule{
|
||||
registry: alertDefinitionRegistry{alertDefinitionInfo: make(map[alertDefinitionKey]alertDefinitionInfo)},
|
||||
stop: make(chan alertDefinitionKey),
|
||||
maxAttempts: maxAttempts,
|
||||
clock: c,
|
||||
baseInterval: baseInterval,
|
||||
@ -166,8 +168,6 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
|
||||
}
|
||||
readyToRun := make([]readyToRunItem, 0)
|
||||
for _, item := range alertDefinitions {
|
||||
itemUID := item.UID
|
||||
itemOrgID := item.OrgID
|
||||
key := item.getKey()
|
||||
itemVersion := item.Version
|
||||
newRoutine := !ng.schedule.registry.exists(key)
|
||||
@ -176,14 +176,14 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
|
||||
|
||||
if newRoutine && !invalidInterval {
|
||||
dispatcherGroup.Go(func() error {
|
||||
return ng.definitionRoutine(ctx, key, definitionInfo.ch)
|
||||
return ng.definitionRoutine(ctx, key, definitionInfo.evalCh, definitionInfo.stopCh)
|
||||
})
|
||||
}
|
||||
|
||||
if invalidInterval {
|
||||
// this is expected to be always false
|
||||
// give that we validate interval during alert definition updates
|
||||
ng.schedule.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "definitionUID", itemUID, "orgID", itemOrgID, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", ng.schedule.baseInterval)
|
||||
ng.schedule.log.Debug("alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval", "key", key, "interval", time.Duration(item.IntervalSeconds)*time.Second, "scheduler interval", ng.schedule.baseInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -205,13 +205,18 @@ func (ng *AlertNG) alertingTicker(grafanaCtx context.Context) error {
|
||||
item := readyToRun[i]
|
||||
|
||||
time.AfterFunc(time.Duration(int64(i)*step), func() {
|
||||
item.definitionInfo.ch <- &evalContext{now: tick, version: item.definitionInfo.version}
|
||||
item.definitionInfo.evalCh <- &evalContext{now: tick, version: item.definitionInfo.version}
|
||||
})
|
||||
}
|
||||
|
||||
// unregister and stop routines of the deleted alert definitions
|
||||
for key := range registeredDefinitions {
|
||||
ng.schedule.stop <- key
|
||||
definitionInfo, err := ng.schedule.registry.get(key)
|
||||
if err != nil {
|
||||
ng.schedule.log.Error("failed to get alert definition routine information", "err", err)
|
||||
continue
|
||||
}
|
||||
definitionInfo.stopCh <- struct{}{}
|
||||
ng.schedule.registry.del(key)
|
||||
}
|
||||
case <-grafanaCtx.Done():
|
||||
@ -234,7 +239,7 @@ func (r *alertDefinitionRegistry) getOrCreateInfo(key alertDefinitionKey, defini
|
||||
|
||||
info, ok := r.alertDefinitionInfo[key]
|
||||
if !ok {
|
||||
r.alertDefinitionInfo[key] = alertDefinitionInfo{ch: make(chan *evalContext), version: definitionVersion}
|
||||
r.alertDefinitionInfo[key] = alertDefinitionInfo{evalCh: make(chan *evalContext), stopCh: make(chan struct{}), version: definitionVersion}
|
||||
return r.alertDefinitionInfo[key]
|
||||
}
|
||||
info.version = definitionVersion
|
||||
@ -242,6 +247,19 @@ func (r *alertDefinitionRegistry) getOrCreateInfo(key alertDefinitionKey, defini
|
||||
return info
|
||||
}
|
||||
|
||||
// get returns the channel for the specific alert definition
|
||||
// if the key does not exist returns an error
|
||||
func (r *alertDefinitionRegistry) get(key alertDefinitionKey) (*alertDefinitionInfo, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
info, ok := r.alertDefinitionInfo[key]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%v key not found", key)
|
||||
}
|
||||
return &info, nil
|
||||
}
|
||||
|
||||
func (r *alertDefinitionRegistry) exists(key alertDefinitionKey) bool {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
@ -283,7 +301,8 @@ func (r *alertDefinitionRegistry) keyMap() map[alertDefinitionKey]struct{} {
|
||||
}
|
||||
|
||||
type alertDefinitionInfo struct {
|
||||
ch chan *evalContext
|
||||
evalCh chan *evalContext
|
||||
stopCh chan struct{}
|
||||
version int64
|
||||
}
|
||||
|
||||
|
@ -37,11 +37,16 @@ func TestAlertingTicker(t *testing.T) {
|
||||
alerts = append(alerts, createTestAlertDefinition(t, ng, 1))
|
||||
|
||||
evalAppliedCh := make(chan evalAppliedInfo, len(alerts))
|
||||
stopAppliedCh := make(chan alertDefinitionKey, len(alerts))
|
||||
|
||||
ng.schedule.evalApplied = func(alertDefKey alertDefinitionKey, now time.Time) {
|
||||
evalAppliedCh <- evalAppliedInfo{alertDefKey: alertDefKey, now: now}
|
||||
}
|
||||
|
||||
ng.schedule.stopApplied = func(alertDefKey alertDefinitionKey) {
|
||||
stopAppliedCh <- alertDefKey
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
go func() {
|
||||
err := ng.alertingTicker(ctx)
|
||||
@ -92,6 +97,10 @@ func TestAlertingTicker(t *testing.T) {
|
||||
tick := advanceClock(t, mockedClock)
|
||||
assertEvalRun(t, evalAppliedCh, tick, expectedAlertDefinitionsEvaluated...)
|
||||
})
|
||||
expectedAlertDefinitionsStopped := []alertDefinitionKey{alerts[1].getKey()}
|
||||
t.Run(fmt.Sprintf("on 5th tick alert definitions: %s should be stopped", concatenate(expectedAlertDefinitionsStopped)), func(t *testing.T) {
|
||||
assertStopRun(t, stopAppliedCh, expectedAlertDefinitionsStopped...)
|
||||
})
|
||||
|
||||
expectedAlertDefinitionsEvaluated = []alertDefinitionKey{alerts[0].getKey()}
|
||||
t.Run(fmt.Sprintf("on 6th tick alert definitions: %s should be evaluated", concatenate(expectedAlertDefinitionsEvaluated)), func(t *testing.T) {
|
||||
@ -137,6 +146,33 @@ func assertEvalRun(t *testing.T, ch <-chan evalAppliedInfo, tick time.Time, keys
|
||||
}
|
||||
}
|
||||
|
||||
func assertStopRun(t *testing.T, ch <-chan alertDefinitionKey, keys ...alertDefinitionKey) {
|
||||
timeout := time.After(time.Second)
|
||||
|
||||
expected := make(map[alertDefinitionKey]struct{}, len(keys))
|
||||
for _, k := range keys {
|
||||
expected[k] = struct{}{}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case alertDefKey := <-ch:
|
||||
_, ok := expected[alertDefKey]
|
||||
t.Logf("alert definition: %v stopped", alertDefKey)
|
||||
assert.True(t, ok)
|
||||
delete(expected, alertDefKey)
|
||||
if len(expected) == 0 {
|
||||
return
|
||||
}
|
||||
case <-timeout:
|
||||
if len(expected) == 0 {
|
||||
return
|
||||
}
|
||||
t.Fatal("cycle has expired")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func advanceClock(t *testing.T, mockedClock *clock.Mock) time.Time {
|
||||
mockedClock.Add(time.Second)
|
||||
return mockedClock.Now()
|
||||
|
Loading…
Reference in New Issue
Block a user