Alerting: Ticker to support stopping (#48142)

* add stop for ticker
* stop ticker when scheduler stops
* stop ticker when legacy engine stops
This commit is contained in:
Yuriy Tseretyan 2022-06-01 11:48:10 -04:00 committed by GitHub
parent 3049534c40
commit c8d891785d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 21 deletions

View File

@ -97,6 +97,7 @@ func ProvideAlertEngine(renderer rendering.Service, requestValidator models.Plug
func (e *AlertEngine) Run(ctx context.Context) error { func (e *AlertEngine) Run(ctx context.Context) error {
reg := prometheus.WrapRegistererWithPrefix("legacy_", prometheus.DefaultRegisterer) reg := prometheus.WrapRegistererWithPrefix("legacy_", prometheus.DefaultRegisterer)
e.ticker = NewTicker(clock.New(), 1*time.Second, metrics.NewTickerMetrics(reg)) e.ticker = NewTicker(clock.New(), 1*time.Second, metrics.NewTickerMetrics(reg))
defer e.ticker.Stop()
alertGroup, ctx := errgroup.WithContext(ctx) alertGroup, ctx := errgroup.WithContext(ctx)
alertGroup.Go(func() error { return e.alertingTicker(ctx) }) alertGroup.Go(func() error { return e.alertingTicker(ctx) })
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) }) alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })

View File

@ -6,6 +6,7 @@ import (
"github.com/benbjohnson/clock" "github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting/metrics" "github.com/grafana/grafana/pkg/services/alerting/metrics"
) )
@ -20,6 +21,7 @@ type Ticker struct {
last time.Time last time.Time
interval time.Duration interval time.Duration
metrics *metrics.Ticker metrics *metrics.Ticker
stopCh chan struct{}
} }
// NewTicker returns a Ticker that ticks on interval marks (or very shortly after) starting at c.Now(), and never drops ticks. interval should not be negative or zero. // NewTicker returns a Ticker that ticks on interval marks (or very shortly after) starting at c.Now(), and never drops ticks. interval should not be negative or zero.
@ -33,6 +35,7 @@ func NewTicker(c clock.Clock, interval time.Duration, metric *metrics.Ticker) *T
last: c.Now(), last: c.Now(),
interval: interval, interval: interval,
metrics: metric, metrics: metric,
stopCh: make(chan struct{}),
} }
metric.IntervalSeconds.Set(t.interval.Seconds()) // Seconds report fractional part as well, so it matches the format of the timestamp we report below metric.IntervalSeconds.Set(t.interval.Seconds()) // Seconds report fractional part as well, so it matches the format of the timestamp we report below
go t.run() go t.run()
@ -40,18 +43,39 @@ func NewTicker(c clock.Clock, interval time.Duration, metric *metrics.Ticker) *T
} }
func (t *Ticker) run() { func (t *Ticker) run() {
logger := log.New("ticker")
logger.Info("starting")
LOOP:
for { for {
next := t.last.Add(t.interval) // calculate the time of the next tick next := t.last.Add(t.interval) // calculate the time of the next tick
t.metrics.NextTickTime.Set(float64(next.UnixNano()) / 1e9) t.metrics.NextTickTime.Set(float64(next.UnixNano()) / 1e9)
diff := t.clock.Now().Sub(next) // calculate the difference between the current time and the next tick diff := t.clock.Now().Sub(next) // calculate the difference between the current time and the next tick
// if difference is not negative, then it should tick // if difference is not negative, then it should tick
if diff >= 0 { if diff >= 0 {
t.C <- next select {
case t.C <- next:
case <-t.stopCh:
break LOOP
}
t.last = next t.last = next
t.metrics.LastTickTime.Set(float64(next.UnixNano()) / 1e9) t.metrics.LastTickTime.Set(float64(next.UnixNano()) / 1e9)
continue continue
} }
// tick is too young. try again when ... // tick is too young. try again when ...
<-t.clock.After(-diff) // ...it'll definitely be old enough select {
case <-t.clock.After(-diff): // ...it'll definitely be old enough
case <-t.stopCh:
break LOOP
}
}
logger.Info("stopped", "last_tick", t.last)
}
// Stop stops the ticker. It does not close the C channel
func (t *Ticker) Stop() {
select {
case t.stopCh <- struct{}{}:
default:
// already stopped
} }
} }

View File

@ -18,7 +18,12 @@ import (
) )
func TestTicker(t *testing.T) { func TestTicker(t *testing.T) {
readChanOrFail := func(t *testing.T, ctx context.Context, c chan time.Time) time.Time { readChanOrFail := func(t *testing.T, c chan time.Time) time.Time {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(func() {
cancel()
})
t.Helper() t.Helper()
select { select {
case tick := <-c: case tick := <-c:
@ -76,11 +81,8 @@ func TestTicker(t *testing.T) {
break break
} }
} }
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(func() { actual := readChanOrFail(t, ticker.C)
cancel()
})
actual := readChanOrFail(t, ctx, ticker.C)
require.Equal(t, expectedTick, actual) require.Equal(t, expectedTick, actual)
}) })
@ -97,18 +99,13 @@ func TestTicker(t *testing.T) {
clk.Add(interval) // advance the clock by the interval to make the ticker tick the first time. clk.Add(interval) // advance the clock by the interval to make the ticker tick the first time.
clk.Add(interval) // advance the clock by the interval to make the ticker tick the second time. clk.Add(interval) // advance the clock by the interval to make the ticker tick the second time.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(func() {
cancel()
})
// Irregardless of wall time, the first tick should be initial clock + interval. // Irregardless of wall time, the first tick should be initial clock + interval.
actual1 := readChanOrFail(t, ctx, ticker.C) actual1 := readChanOrFail(t, ticker.C)
require.Equal(t, expectedTick, actual1) require.Equal(t, expectedTick, actual1)
var actual2 time.Time var actual2 time.Time
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
actual2 = readChanOrFail(t, ctx, ticker.C) actual2 = readChanOrFail(t, ticker.C)
return true return true
}, time.Second, 10*time.Millisecond) }, time.Second, 10*time.Millisecond)
@ -147,11 +144,7 @@ func TestTicker(t *testing.T) {
}, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs) }, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs)
clk.Add(interval) clk.Add(interval)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) actual := readChanOrFail(t, ticker.C)
t.Cleanup(func() {
cancel()
})
actual := readChanOrFail(t, ctx, ticker.C)
expectedMetric = fmt.Sprintf(expectedMetricFmt, interval.Seconds(), float64(actual.UnixNano())/1e9, float64(expectedTick.Add(interval).UnixNano())/1e9) expectedMetric = fmt.Sprintf(expectedMetricFmt, interval.Seconds(), float64(actual.UnixNano())/1e9, float64(expectedTick.Add(interval).UnixNano())/1e9)
@ -163,4 +156,35 @@ func TestTicker(t *testing.T) {
return err == nil return err == nil
}, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs) }, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs)
}) })
t.Run("should stop", func(t *testing.T) {
t.Run("when it waits for the next tick", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
clk.Add(interval)
readChanOrFail(t, ticker.C)
ticker.Stop()
clk.Add(interval)
require.Empty(t, ticker.C)
})
t.Run("when it waits for the tick to be consumed", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
clk.Add(interval)
ticker.Stop()
require.Empty(t, ticker.C)
})
t.Run("multiple times", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
ticker.Stop()
ticker.Stop()
ticker.Stop()
})
})
} }

View File

@ -59,6 +59,7 @@ Scopes must have an order to ensure consistency and ease of search, this helps u
- [BUGFIX] RBAC: replace create\update\delete actions for notification policies by alert.notifications:write #49185 - [BUGFIX] RBAC: replace create\update\delete actions for notification policies by alert.notifications:write #49185
- [BUGFIX] Fix access to alerts for Viewer role with editor permissions in folder #49270 - [BUGFIX] Fix access to alerts for Viewer role with editor permissions in folder #49270
- [FEATURE] Alert rules with associated panels will take screenshots. #49293 #49338 #49374 #49377 #49378 #49379 #49381 #49385 #49439 #49445 - [FEATURE] Alert rules with associated panels will take screenshots. #49293 #49338 #49374 #49377 #49378 #49379 #49381 #49385 #49439 #49445
- [ENHANCEMENT] Scheduler: ticker to support stopping #48142
## 8.5.3 ## 8.5.3
@ -81,4 +82,3 @@ Scopes must have an order to ensure consistency and ease of search, this helps u
- [BUGFIX] (Legacy) Templates: Parse notification templates using all the matches of the alert rule when going from `Alerting` to `OK` in legacy alerting #47355 - [BUGFIX] (Legacy) Templates: Parse notification templates using all the matches of the alert rule when going from `Alerting` to `OK` in legacy alerting #47355
- [BUGFIX] Scheduler: Fix state manager to support OK option of `AlertRule.ExecErrState` #47670 - [BUGFIX] Scheduler: Fix state manager to support OK option of `AlertRule.ExecErrState` #47670
- [ENHANCEMENT] Templates: Enable the use of classic condition values in templates #46971 - [ENHANCEMENT] Templates: Enable the use of classic condition values in templates #46971

View File

@ -156,6 +156,8 @@ func (sch *schedule) Run(ctx context.Context) error {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
defer sch.ticker.Stop()
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := sch.schedulePeriodic(ctx); err != nil { if err := sch.schedulePeriodic(ctx); err != nil {
@ -652,6 +654,7 @@ func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State)
func (sch *schedule) overrideCfg(cfg SchedulerCfg) { func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C sch.clock = cfg.C
sch.baseInterval = cfg.BaseInterval sch.baseInterval = cfg.BaseInterval
sch.ticker.Stop()
sch.ticker = alerting.NewTicker(cfg.C, cfg.BaseInterval, cfg.Metrics.Ticker) sch.ticker = alerting.NewTicker(cfg.C, cfg.BaseInterval, cfg.Metrics.Ticker)
sch.evalAppliedFunc = cfg.EvalAppliedFunc sch.evalAppliedFunc = cfg.EvalAppliedFunc
sch.stopAppliedFunc = cfg.StopAppliedFunc sch.stopAppliedFunc = cfg.StopAppliedFunc