diff --git a/pkg/services/alerting/engine.go b/pkg/services/alerting/engine.go index decab1df9b3..72d1f3ad7bb 100644 --- a/pkg/services/alerting/engine.go +++ b/pkg/services/alerting/engine.go @@ -97,6 +97,7 @@ func ProvideAlertEngine(renderer rendering.Service, requestValidator models.Plug func (e *AlertEngine) Run(ctx context.Context) error { reg := prometheus.WrapRegistererWithPrefix("legacy_", prometheus.DefaultRegisterer) e.ticker = NewTicker(clock.New(), 1*time.Second, metrics.NewTickerMetrics(reg)) + defer e.ticker.Stop() alertGroup, ctx := errgroup.WithContext(ctx) alertGroup.Go(func() error { return e.alertingTicker(ctx) }) alertGroup.Go(func() error { return e.runJobDispatcher(ctx) }) diff --git a/pkg/services/alerting/ticker.go b/pkg/services/alerting/ticker.go index 16ea19ea5a3..a992a579ef7 100644 --- a/pkg/services/alerting/ticker.go +++ b/pkg/services/alerting/ticker.go @@ -6,6 +6,7 @@ import ( "github.com/benbjohnson/clock" + "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/alerting/metrics" ) @@ -20,6 +21,7 @@ type Ticker struct { last time.Time interval time.Duration metrics *metrics.Ticker + stopCh chan struct{} } // NewTicker returns a Ticker that ticks on interval marks (or very shortly after) starting at c.Now(), and never drops ticks. interval should not be negative or zero. @@ -33,6 +35,7 @@ func NewTicker(c clock.Clock, interval time.Duration, metric *metrics.Ticker) *T last: c.Now(), interval: interval, metrics: metric, + stopCh: make(chan struct{}), } metric.IntervalSeconds.Set(t.interval.Seconds()) // Seconds report fractional part as well, so it matches the format of the timestamp we report below go t.run() @@ -40,18 +43,39 @@ func NewTicker(c clock.Clock, interval time.Duration, metric *metrics.Ticker) *T } func (t *Ticker) run() { + logger := log.New("ticker") + logger.Info("starting") +LOOP: for { next := t.last.Add(t.interval) // calculate the time of the next tick t.metrics.NextTickTime.Set(float64(next.UnixNano()) / 1e9) diff := t.clock.Now().Sub(next) // calculate the difference between the current time and the next tick // if difference is not negative, then it should tick if diff >= 0 { - t.C <- next + select { + case t.C <- next: + case <-t.stopCh: + break LOOP + } t.last = next t.metrics.LastTickTime.Set(float64(next.UnixNano()) / 1e9) continue } // tick is too young. try again when ... - <-t.clock.After(-diff) // ...it'll definitely be old enough + select { + case <-t.clock.After(-diff): // ...it'll definitely be old enough + case <-t.stopCh: + break LOOP + } + } + logger.Info("stopped", "last_tick", t.last) +} + +// Stop stops the ticker. It does not close the C channel +func (t *Ticker) Stop() { + select { + case t.stopCh <- struct{}{}: + default: + // already stopped } } diff --git a/pkg/services/alerting/ticker_test.go b/pkg/services/alerting/ticker_test.go index b642067ea63..1994939b5ec 100644 --- a/pkg/services/alerting/ticker_test.go +++ b/pkg/services/alerting/ticker_test.go @@ -18,7 +18,12 @@ import ( ) func TestTicker(t *testing.T) { - readChanOrFail := func(t *testing.T, ctx context.Context, c chan time.Time) time.Time { + readChanOrFail := func(t *testing.T, c chan time.Time) time.Time { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(func() { + cancel() + }) + t.Helper() select { case tick := <-c: @@ -76,11 +81,8 @@ func TestTicker(t *testing.T) { break } } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - t.Cleanup(func() { - cancel() - }) - actual := readChanOrFail(t, ctx, ticker.C) + + actual := readChanOrFail(t, ticker.C) require.Equal(t, expectedTick, actual) }) @@ -97,18 +99,13 @@ func TestTicker(t *testing.T) { clk.Add(interval) // advance the clock by the interval to make the ticker tick the first time. clk.Add(interval) // advance the clock by the interval to make the ticker tick the second time. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - t.Cleanup(func() { - cancel() - }) - // Irregardless of wall time, the first tick should be initial clock + interval. - actual1 := readChanOrFail(t, ctx, ticker.C) + actual1 := readChanOrFail(t, ticker.C) require.Equal(t, expectedTick, actual1) var actual2 time.Time require.Eventually(t, func() bool { - actual2 = readChanOrFail(t, ctx, ticker.C) + actual2 = readChanOrFail(t, ticker.C) return true }, time.Second, 10*time.Millisecond) @@ -147,11 +144,7 @@ func TestTicker(t *testing.T) { }, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs) clk.Add(interval) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - t.Cleanup(func() { - cancel() - }) - actual := readChanOrFail(t, ctx, ticker.C) + actual := readChanOrFail(t, ticker.C) expectedMetric = fmt.Sprintf(expectedMetricFmt, interval.Seconds(), float64(actual.UnixNano())/1e9, float64(expectedTick.Add(interval).UnixNano())/1e9) @@ -163,4 +156,35 @@ func TestTicker(t *testing.T) { return err == nil }, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs) }) + + t.Run("should stop", func(t *testing.T) { + t.Run("when it waits for the next tick", func(t *testing.T) { + clk := clock.NewMock() + interval := time.Duration(rand.Int63n(9)+1) * time.Second + ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry())) + clk.Add(interval) + readChanOrFail(t, ticker.C) + ticker.Stop() + clk.Add(interval) + require.Empty(t, ticker.C) + }) + + t.Run("when it waits for the tick to be consumed", func(t *testing.T) { + clk := clock.NewMock() + interval := time.Duration(rand.Int63n(9)+1) * time.Second + ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry())) + clk.Add(interval) + ticker.Stop() + require.Empty(t, ticker.C) + }) + + t.Run("multiple times", func(t *testing.T) { + clk := clock.NewMock() + interval := time.Duration(rand.Int63n(9)+1) * time.Second + ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry())) + ticker.Stop() + ticker.Stop() + ticker.Stop() + }) + }) } diff --git a/pkg/services/ngalert/CHANGELOG.md b/pkg/services/ngalert/CHANGELOG.md index c4d874552ce..fd868e58f38 100644 --- a/pkg/services/ngalert/CHANGELOG.md +++ b/pkg/services/ngalert/CHANGELOG.md @@ -59,6 +59,7 @@ Scopes must have an order to ensure consistency and ease of search, this helps u - [BUGFIX] RBAC: replace create\update\delete actions for notification policies by alert.notifications:write #49185 - [BUGFIX] Fix access to alerts for Viewer role with editor permissions in folder #49270 - [FEATURE] Alert rules with associated panels will take screenshots. #49293 #49338 #49374 #49377 #49378 #49379 #49381 #49385 #49439 #49445 +- [ENHANCEMENT] Scheduler: ticker to support stopping #48142 ## 8.5.3 @@ -81,4 +82,3 @@ Scopes must have an order to ensure consistency and ease of search, this helps u - [BUGFIX] (Legacy) Templates: Parse notification templates using all the matches of the alert rule when going from `Alerting` to `OK` in legacy alerting #47355 - [BUGFIX] Scheduler: Fix state manager to support OK option of `AlertRule.ExecErrState` #47670 - [ENHANCEMENT] Templates: Enable the use of classic condition values in templates #46971 - diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index 5e5df9bfb59..d31ac723eff 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -156,6 +156,8 @@ func (sch *schedule) Run(ctx context.Context) error { var wg sync.WaitGroup wg.Add(2) + defer sch.ticker.Stop() + go func() { defer wg.Done() if err := sch.schedulePeriodic(ctx); err != nil { @@ -652,6 +654,7 @@ func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State) func (sch *schedule) overrideCfg(cfg SchedulerCfg) { sch.clock = cfg.C sch.baseInterval = cfg.BaseInterval + sch.ticker.Stop() sch.ticker = alerting.NewTicker(cfg.C, cfg.BaseInterval, cfg.Metrics.Ticker) sch.evalAppliedFunc = cfg.EvalAppliedFunc sch.stopAppliedFunc = cfg.StopAppliedFunc