mirror of
https://github.com/grafana/grafana.git
synced 2025-02-14 17:43:35 -06:00
Alerting: Scheduler to drop ticks if a rule's evaluation is too slow (#48885)
* drop ticks if evaluation of a rule is too slow. * add metric schedule_rule_evaluations_missed_total
This commit is contained in:
parent
ffb8ae4900
commit
a89d4a5be7
@ -45,6 +45,8 @@ Scopes must have an order to ensure consistency and ease of search, this helps u
|
||||
|
||||
## Grafana Alerting - main / unreleased
|
||||
|
||||
- [ENHANCEMENT] Scheduler: Drop ticks if rule evaluation is too slow and adds a metric grafana_alerting_schedule_rule_evaluations_missed_total to track missed evaluations per rule #48885
|
||||
|
||||
## 9.0.0
|
||||
|
||||
- [ENHANCEMENT] Scheduler: Ticker expose new metrics. In legacy, metrics are prefixed with `legacy_` #47828, #48190
|
||||
|
@ -56,6 +56,7 @@ type Scheduler struct {
|
||||
SchedulableAlertRulesHash prometheus.Gauge
|
||||
UpdateSchedulableAlertRulesDuration prometheus.Histogram
|
||||
Ticker *legacyMetrics.Ticker
|
||||
EvaluationMissed *prometheus.CounterVec
|
||||
}
|
||||
|
||||
type MultiOrgAlertmanager struct {
|
||||
@ -199,6 +200,15 @@ func newSchedulerMetrics(r prometheus.Registerer) *Scheduler {
|
||||
},
|
||||
),
|
||||
Ticker: legacyMetrics.NewTickerMetrics(r),
|
||||
EvaluationMissed: promauto.With(r).NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "schedule_rule_evaluations_missed_total",
|
||||
Help: "The total number of rule evaluations missed due to a slow rule evaluation.",
|
||||
},
|
||||
[]string{"org", "name"},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,6 +125,7 @@ type AlertRule struct {
|
||||
}
|
||||
|
||||
type SchedulableAlertRule struct {
|
||||
Title string
|
||||
UID string `xorm:"uid"`
|
||||
OrgID int64 `xorm:"org_id"`
|
||||
IntervalSeconds int64
|
||||
|
@ -84,16 +84,28 @@ func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
|
||||
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
|
||||
}
|
||||
|
||||
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped
|
||||
func (a *alertRuleInfo) eval(t time.Time, version int64) bool {
|
||||
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped.
|
||||
// Before sending a message into the channel, it does non-blocking read to make sure that there is no concurrent send operation.
|
||||
// Returns a tuple where first element is
|
||||
// - true when message was sent
|
||||
// - false when the send operation is stopped
|
||||
// the second element contains a dropped message that was sent by a concurrent sender.
|
||||
func (a *alertRuleInfo) eval(t time.Time, version int64) (bool, *evaluation) {
|
||||
// read the channel in unblocking manner to make sure that there is no concurrent send operation.
|
||||
var droppedMsg *evaluation
|
||||
select {
|
||||
case droppedMsg = <-a.evalCh:
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case a.evalCh <- &evaluation{
|
||||
scheduledAt: t,
|
||||
version: version,
|
||||
}:
|
||||
return true
|
||||
return true, droppedMsg
|
||||
case <-a.ctx.Done():
|
||||
return false
|
||||
return false, droppedMsg
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
@ -15,6 +16,11 @@ import (
|
||||
)
|
||||
|
||||
func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
type evalResponse struct {
|
||||
success bool
|
||||
droppedEval *evaluation
|
||||
}
|
||||
|
||||
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
|
||||
t.Run("Update should send to updateCh", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
@ -32,31 +38,73 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
t.Run("eval should send to evalCh", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
expected := time.Now()
|
||||
resultCh := make(chan bool)
|
||||
resultCh := make(chan evalResponse)
|
||||
version := rand.Int63()
|
||||
go func() {
|
||||
resultCh <- r.eval(expected, version)
|
||||
result, dropped := r.eval(expected, version)
|
||||
resultCh <- evalResponse{result, dropped}
|
||||
}()
|
||||
select {
|
||||
case ctx := <-r.evalCh:
|
||||
require.Equal(t, version, ctx.version)
|
||||
require.Equal(t, expected, ctx.scheduledAt)
|
||||
require.True(t, <-resultCh)
|
||||
result := <-resultCh
|
||||
require.True(t, result.success)
|
||||
require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
}
|
||||
})
|
||||
t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
time1 := time.UnixMilli(rand.Int63n(math.MaxInt64))
|
||||
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
|
||||
resultCh1 := make(chan evalResponse)
|
||||
resultCh2 := make(chan evalResponse)
|
||||
version := rand.Int63()
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
wg.Done()
|
||||
result, dropped := r.eval(time1, version)
|
||||
wg.Done()
|
||||
resultCh1 <- evalResponse{result, dropped}
|
||||
}()
|
||||
wg.Wait()
|
||||
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
|
||||
go func() {
|
||||
wg.Done()
|
||||
result, dropped := r.eval(time2, version)
|
||||
resultCh2 <- evalResponse{result, dropped}
|
||||
}()
|
||||
wg.Wait() // at this point tick 1 has already been dropped
|
||||
select {
|
||||
case ctx := <-r.evalCh:
|
||||
require.Equal(t, time2, ctx.scheduledAt)
|
||||
result := <-resultCh1
|
||||
require.True(t, result.success)
|
||||
require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one")
|
||||
result = <-resultCh2
|
||||
require.True(t, result.success)
|
||||
require.NotNil(t, result.droppedEval, "expected no dropped evaluations but got one")
|
||||
require.Equal(t, time1, result.droppedEval.scheduledAt)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
}
|
||||
})
|
||||
t.Run("eval should exit when context is cancelled", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
resultCh := make(chan bool)
|
||||
resultCh := make(chan evalResponse)
|
||||
go func() {
|
||||
resultCh <- r.eval(time.Now(), rand.Int63())
|
||||
result, dropped := r.eval(time.Now(), rand.Int63())
|
||||
resultCh <- evalResponse{result, dropped}
|
||||
}()
|
||||
runtime.Gosched()
|
||||
r.stop()
|
||||
select {
|
||||
case result := <-resultCh:
|
||||
require.False(t, result)
|
||||
require.False(t, result.success)
|
||||
require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
}
|
||||
@ -71,7 +119,9 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
t.Run("eval should do nothing", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
r.stop()
|
||||
require.False(t, r.eval(time.Now(), rand.Int63()))
|
||||
success, dropped := r.eval(time.Now(), rand.Int63())
|
||||
require.False(t, success)
|
||||
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
|
||||
})
|
||||
t.Run("stop should do nothing", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
|
@ -394,6 +394,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
|
||||
|
||||
type readyToRunItem struct {
|
||||
key models.AlertRuleKey
|
||||
ruleName string
|
||||
ruleInfo *alertRuleInfo
|
||||
version int64
|
||||
}
|
||||
@ -427,7 +428,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
|
||||
|
||||
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
|
||||
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
|
||||
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleInfo: ruleInfo, version: itemVersion})
|
||||
readyToRun = append(readyToRun, readyToRunItem{key: key, ruleName: item.Title, ruleInfo: ruleInfo, version: itemVersion})
|
||||
}
|
||||
|
||||
// remove the alert rule from the registered alert rules
|
||||
@ -443,9 +444,15 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
|
||||
item := readyToRun[i]
|
||||
|
||||
time.AfterFunc(time.Duration(int64(i)*step), func() {
|
||||
success := item.ruleInfo.eval(tick, item.version)
|
||||
success, dropped := item.ruleInfo.eval(tick, item.version)
|
||||
if !success {
|
||||
sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", item.key.UID, "org", item.key.OrgID, "time", tick)
|
||||
return
|
||||
}
|
||||
if dropped != nil {
|
||||
sch.log.Warn("Alert rule evaluation is too slow - dropped tick", "uid", item.key.UID, "org", item.key.OrgID, "time", tick)
|
||||
orgID := fmt.Sprint(item.key.OrgID)
|
||||
sch.metrics.EvaluationMissed.WithLabelValues(orgID, item.ruleName).Inc()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -859,7 +859,9 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
|
||||
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
|
||||
sch.DeleteAlertRule(key)
|
||||
require.False(t, info.update())
|
||||
require.False(t, info.eval(time.Now(), 1))
|
||||
success, dropped := info.eval(time.Now(), 1)
|
||||
require.False(t, success)
|
||||
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
|
||||
require.False(t, sch.registry.exists(key))
|
||||
})
|
||||
t.Run("should remove controller from registry", func(t *testing.T) {
|
||||
@ -869,7 +871,9 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
|
||||
info.stop()
|
||||
sch.DeleteAlertRule(key)
|
||||
require.False(t, info.update())
|
||||
require.False(t, info.eval(time.Now(), 1))
|
||||
success, dropped := info.eval(time.Now(), 1)
|
||||
require.False(t, success)
|
||||
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
|
||||
require.False(t, sch.registry.exists(key))
|
||||
})
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user