Alerting: make alert rule routine evaluation control be thread-safe (#41220)

* change registry.delete to return deleted struct
* use pointer to alertRuleInfo instead copying.
* do not access evaluation channel when routine is stopped
* remove stopCh and use context cancellation
* do not return ctx.Err when channel is cancelled because it cancels all other routines
* make alertRuleInfo fields and functions package private
This commit is contained in:
Yuriy Tseretyan 2021-12-16 14:52:47 -05:00 committed by GitHub
parent 56921b205d
commit 1a762083d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 158 additions and 92 deletions

View File

@ -120,7 +120,7 @@ func NewScheduler(cfg SchedulerCfg, expressionService *expr.Service, appURL *url
ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch := schedule{
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]alertRuleInfo)},
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]*alertRuleInfo)},
maxAttempts: cfg.MaxAttempts,
clock: cfg.C,
baseInterval: cfg.BaseInterval,
@ -349,15 +349,15 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
type readyToRunItem struct {
key models.AlertRuleKey
ruleInfo alertRuleInfo
ruleInfo *alertRuleInfo
version int64
}
readyToRun := make([]readyToRunItem, 0)
for _, item := range alertRules {
key := item.GetKey()
itemVersion := item.Version
newRoutine := !sch.registry.exists(key)
ruleInfo := sch.registry.getOrCreateInfo(key, itemVersion)
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key)
// enforce minimum evaluation interval
if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) {
@ -369,7 +369,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return sch.ruleRoutine(ctx, key, ruleInfo.evalCh, ruleInfo.stopCh)
return sch.ruleRoutine(ruleInfo.ctx, key, ruleInfo.evalCh)
})
}
@ -382,7 +382,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo})
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo, version: itemVersion})
}
// remove the alert rule from the registered alert rules
@ -398,19 +398,21 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
item := readyToRun[i]
time.AfterFunc(time.Duration(int64(i)*step), func() {
item.ruleInfo.evalCh <- &evalContext{now: tick, version: item.ruleInfo.version}
success := item.ruleInfo.eval(tick, item.version)
if !success {
sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", "uid", item.key.UID, "org", item.key.OrgID, "time", tick)
}
})
}
// unregister and stop routines of the deleted alert rules
for key := range registeredDefinitions {
ruleInfo, err := sch.registry.get(key)
if err != nil {
sch.log.Error("failed to get alert rule routine information", "err", err)
ruleInfo, ok := sch.registry.del(key)
if !ok {
sch.log.Error("unable to delete alert rule routine information because it did not exist", "uid", key.UID, "org_id", key.OrgID)
continue
}
ruleInfo.stopCh <- struct{}{}
sch.registry.del(key)
ruleInfo.stop()
}
case <-ctx.Done():
waitErr := dispatcherGroup.Wait()
@ -430,7 +432,7 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
}
}
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext) error {
logger := sch.log.New("uid", key.UID, "org", key.OrgID)
logger.Debug("alert rule routine started")
@ -525,9 +527,14 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
evalRunning := false
var currentRule *models.AlertRule
defer sch.stopApplied(key)
for {
select {
case ctx := <-evalCh:
case ctx, ok := <-evalCh:
if !ok {
logger.Debug("Evaluation channel has been closed. Exiting")
return nil
}
if evalRunning {
continue
}
@ -555,13 +562,9 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
logger.Error("evaluation failed after all retries", "err", err)
}
}()
case <-stopCh:
sch.stopApplied(key)
logger.Debug("stopping alert rule routine")
// interrupt evaluation if it's running
return nil
case <-grafanaCtx.Done():
return grafanaCtx.Err()
logger.Debug("stopping alert rule routine")
return nil
}
}
}
@ -587,51 +590,34 @@ func (sch *schedule) saveAlertStates(states []*state.State) {
type alertRuleRegistry struct {
mu sync.Mutex
alertRuleInfo map[models.AlertRuleKey]alertRuleInfo
alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo
}
// getOrCreateInfo returns the channel for the specific alert rule
// if it does not exists creates one and returns it
func (r *alertRuleRegistry) getOrCreateInfo(key models.AlertRuleKey, ruleVersion int64) alertRuleInfo {
// getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one.
// Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not.
func (r *alertRuleRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey) (*alertRuleInfo, bool) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
r.alertRuleInfo[key] = alertRuleInfo{evalCh: make(chan *evalContext), stopCh: make(chan struct{}), version: ruleVersion}
return r.alertRuleInfo[key]
info = newAlertRuleInfo(context)
r.alertRuleInfo[key] = info
}
info.version = ruleVersion
r.alertRuleInfo[key] = info
return info
return info, !ok
}
// get returns the channel for the specific alert rule
// if the key does not exist returns an error
func (r *alertRuleRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) {
// del removes pair that has specific key from alertRuleInfo.
// Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed.
func (r *alertRuleRegistry) del(key models.AlertRuleKey) (*alertRuleInfo, bool) {
r.mu.Lock()
defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key]
if !ok {
return nil, fmt.Errorf("%v key not found", key)
if ok {
delete(r.alertRuleInfo, key)
}
return &info, nil
}
func (r *alertRuleRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.alertRuleInfo[key]
return ok
}
func (r *alertRuleRegistry) del(key models.AlertRuleKey) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.alertRuleInfo, key)
return info, ok
}
func (r *alertRuleRegistry) iter() <-chan models.AlertRuleKey {
@ -660,9 +646,27 @@ func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
}
type alertRuleInfo struct {
evalCh chan *evalContext
stopCh chan struct{}
version int64
evalCh chan *evalContext
ctx context.Context
stop context.CancelFunc
}
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
ctx, cancel := context.WithCancel(parent)
return &alertRuleInfo{evalCh: make(chan *evalContext), ctx: ctx, stop: cancel}
}
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped
func (a *alertRuleInfo) eval(t time.Time, version int64) bool {
select {
case a.evalCh <- &evalContext{
now: t,
version: version,
}:
return true
case <-a.ctx.Done():
return false
}
}
type evalContext struct {

View File

@ -7,6 +7,8 @@ import (
"fmt"
"math/rand"
"net/url"
"runtime"
"sync"
"testing"
"time"
@ -272,11 +274,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState)
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -367,35 +367,19 @@ func TestSchedule_ruleRoutine(t *testing.T) {
}
t.Run("should exit", func(t *testing.T) {
t.Run("when we signal it to stop", func(t *testing.T) {
stopChan := make(chan struct{})
stoppedChan := make(chan error)
sch, _, _, _, _ := createSchedule(make(chan time.Time))
go func() {
err := sch.ruleRoutine(context.Background(), models.AlertRuleKey{}, make(chan *evalContext), stopChan)
stoppedChan <- err
}()
stopChan <- struct{}{}
err := waitForErrChannel(t, stoppedChan)
require.NoError(t, err)
})
t.Run("when context is cancelled", func(t *testing.T) {
stoppedChan := make(chan error)
sch, _, _, _, _ := createSchedule(make(chan time.Time))
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext), make(chan struct{}))
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext))
stoppedChan <- err
}()
cancel()
err := waitForErrChannel(t, stoppedChan)
require.ErrorIs(t, err, context.Canceled)
require.NoError(t, err)
})
})
@ -408,11 +392,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -462,11 +444,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -547,11 +527,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting)
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan)
}()
evalChan <- &evalContext{
@ -574,6 +552,90 @@ func TestSchedule_ruleRoutine(t *testing.T) {
})
}
func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("eval should send to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
expected := time.Now()
resultCh := make(chan bool)
version := rand.Int63()
go func() {
resultCh <- r.eval(expected, version)
}()
select {
case ctx := <-r.evalCh:
require.Equal(t, version, ctx.version)
require.Equal(t, expected, ctx.now)
require.True(t, <-resultCh)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
resultCh := make(chan bool)
go func() {
resultCh <- r.eval(time.Now(), rand.Int63())
}()
runtime.Gosched()
r.stop()
select {
case result := <-resultCh:
require.False(t, result)
case <-time.After(5 * time.Second):
t.Fatal("No message was received on eval channel")
}
})
})
t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
require.False(t, r.eval(time.Now(), rand.Int63()))
})
t.Run("stop should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
r.stop()
r.stop()
})
})
t.Run("should be thread-safe", func(t *testing.T) {
r := newAlertRuleInfo(context.Background())
wg := sync.WaitGroup{}
go func() {
for {
select {
case <-r.evalCh:
time.Sleep(time.Millisecond)
case <-r.ctx.Done():
return
}
}
}()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < 20; i++ {
max := 2
if i <= 10 {
max = 1
}
switch rand.Intn(max) + 1 {
case 1:
r.eval(time.Now(), rand.Int63())
case 2:
r.stop()
}
}
wg.Done()
}()
}
wg.Wait()
})
}
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) {
t.Helper()