grafana/pkg/services/alerting/ticker_test.go

206 lines
7.2 KiB
Go

package alerting
import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/alerting/metrics"
)
func TestTicker(t *testing.T) {
readChanOrFail := func(t *testing.T, c chan time.Time) time.Time {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(func() {
cancel()
})
t.Helper()
select {
case tick := <-c:
return tick
case <-ctx.Done():
require.Failf(t, fmt.Sprintf("%v", ctx.Err()), "timeout reading the channel")
default:
require.Failf(t, "channel is empty but it should have a tick", "")
}
return time.Time{}
}
t.Run("should align with clock", func(t *testing.T) {
interval := 10 * time.Second
clk := clock.NewMock()
clk.Add(1 * time.Minute)
require.Equal(t, clk.Now(), getStartTick(clk, interval))
now := clk.Now()
for i := 0; i < 100; i++ {
delta := time.Duration(rand.Int63n(interval.Nanoseconds()))
clk.Set(now.Add(delta))
require.Equal(t, now, getStartTick(clk, interval))
}
})
t.Run("should not drop ticks", func(t *testing.T) {
interval := time.Duration(rand.Int63n(100)+10) * time.Second
clk := clock.NewMock()
clk.Add(interval) // align clock with the start tick
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
ticks := rand.Intn(9) + 1
jitter := rand.Int63n(int64(interval) - 1)
clk.Add(time.Duration(ticks)*interval + time.Duration(jitter))
w := sync.WaitGroup{}
w.Add(1)
regTicks := make([]time.Time, 0, ticks)
go func() {
for timestamp := range ticker.C {
regTicks = append(regTicks, timestamp)
if len(regTicks) == ticks {
w.Done()
}
}
}()
w.Wait()
require.Len(t, regTicks, ticks)
t.Run("ticks should monotonically increase", func(t *testing.T) {
for i := 1; i < len(regTicks); i++ {
previous := regTicks[i-1]
current := regTicks[i]
require.Equal(t, interval, current.Sub(previous))
}
})
})
t.Run("should not put anything to channel until it's time", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
expectedTick := clk.Now().Add(interval)
for {
require.Empty(t, ticker.C)
clk.Add(time.Duration(rand.Int31n(500)+100) * time.Millisecond)
if clk.Now().After(expectedTick) {
break
}
}
actual := readChanOrFail(t, ticker.C)
require.Equal(t, expectedTick, actual)
})
t.Run("should put the tick in the channel immediately if it is behind", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
// We can expect the first tick to be at a consistent interval. Take a snapshot of the clock now, before we advance it.
expectedTick := clk.Now().Add(interval)
require.Empty(t, ticker.C)
clk.Add(interval) // advance the clock by the interval to make the ticker tick the first time.
clk.Add(interval) // advance the clock by the interval to make the ticker tick the second time.
// Irregardless of wall time, the first tick should be initial clock + interval.
actual1 := readChanOrFail(t, ticker.C)
require.Equal(t, expectedTick, actual1)
var actual2 time.Time
require.Eventually(t, func() bool {
actual2 = readChanOrFail(t, ticker.C)
return true
}, time.Second, 10*time.Millisecond)
// Similarly, the second tick should be last tick + interval irregardless of wall time.
require.Equal(t, expectedTick.Add(interval), actual2)
})
t.Run("should report metrics", func(t *testing.T) {
clk := clock.NewMock()
clk.Set(time.Now())
interval := time.Duration(rand.Int63n(9)+1) * time.Second
registry := prometheus.NewPedanticRegistry()
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(registry))
expectedTick := getStartTick(clk, interval).Add(interval)
expectedMetricFmt := `# HELP grafana_alerting_ticker_interval_seconds Interval at which the ticker is meant to tick.
# TYPE grafana_alerting_ticker_interval_seconds gauge
grafana_alerting_ticker_interval_seconds %v
# HELP grafana_alerting_ticker_last_consumed_tick_timestamp_seconds Timestamp of the last consumed tick in seconds.
# TYPE grafana_alerting_ticker_last_consumed_tick_timestamp_seconds gauge
grafana_alerting_ticker_last_consumed_tick_timestamp_seconds %v
# HELP grafana_alerting_ticker_next_tick_timestamp_seconds Timestamp of the next tick in seconds before it is consumed.
# TYPE grafana_alerting_ticker_next_tick_timestamp_seconds gauge
grafana_alerting_ticker_next_tick_timestamp_seconds %v
`
expectedMetric := fmt.Sprintf(expectedMetricFmt, interval.Seconds(), 0, float64(expectedTick.UnixNano())/1e9)
errs := make(map[string]error, 1)
require.Eventuallyf(t, func() bool {
err := testutil.GatherAndCompare(registry, bytes.NewBufferString(expectedMetric), "grafana_alerting_ticker_last_consumed_tick_timestamp_seconds", "grafana_alerting_ticker_next_tick_timestamp_seconds", "grafana_alerting_ticker_interval_seconds")
if err != nil {
errs["error"] = err
}
return err == nil
}, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs)
clk.Add(interval)
actual := readChanOrFail(t, ticker.C)
expectedMetric = fmt.Sprintf(expectedMetricFmt, interval.Seconds(), float64(actual.UnixNano())/1e9, float64(expectedTick.Add(interval).UnixNano())/1e9)
require.Eventuallyf(t, func() bool {
err := testutil.GatherAndCompare(registry, bytes.NewBufferString(expectedMetric), "grafana_alerting_ticker_last_consumed_tick_timestamp_seconds", "grafana_alerting_ticker_next_tick_timestamp_seconds", "grafana_alerting_ticker_interval_seconds")
if err != nil {
errs["error"] = err
}
return err == nil
}, 1*time.Second, 100*time.Millisecond, "failed to wait for metrics to match expected values:\n%v", errs)
})
t.Run("should stop", func(t *testing.T) {
t.Run("when it waits for the next tick", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
clk.Add(interval)
readChanOrFail(t, ticker.C)
ticker.Stop()
clk.Add(interval)
require.Empty(t, ticker.C)
})
t.Run("when it waits for the tick to be consumed", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
clk.Add(interval)
ticker.Stop()
require.Empty(t, ticker.C)
})
t.Run("multiple times", func(t *testing.T) {
clk := clock.NewMock()
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
ticker.Stop()
ticker.Stop()
ticker.Stop()
})
})
}