Alerting: Remove unused features from ticker + metric + tests (#47828)

* remove not used code:
  - remove offset in ticket because it is not used
  - remove unused ticker and scheduler methods

* use duration for interval
* add metrics grafana_alerting_ticker_last_consumed_tick_timestamp_seconds, grafana_alerting_ticker_next_tick_timestamp_seconds, grafana_alerting_ticker_interval_seconds
This commit is contained in:
Yuriy Tseretyan 2022-04-22 15:09:47 -04:00 committed by GitHub
parent 8310789ef1
commit 75ba4e98c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 128 additions and 91 deletions

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"
@ -14,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/alerting/metrics"
"github.com/grafana/grafana/pkg/services/encryption"
"github.com/grafana/grafana/pkg/services/notifications"
"github.com/grafana/grafana/pkg/services/rendering"
@ -77,7 +79,6 @@ func ProvideAlertEngine(renderer rendering.Service, requestValidator models.Plug
sqlStore: sqlStore,
dashAlertExtractor: dashAlertExtractor,
}
e.ticker = NewTicker(time.Now(), time.Second*0, clock.New(), 1)
e.execQueue = make(chan *Job, 1000)
e.scheduler = newScheduler()
e.evalHandler = NewEvalHandler(e.DataService)
@ -92,6 +93,7 @@ func ProvideAlertEngine(renderer rendering.Service, requestValidator models.Plug
// Run starts the alerting service background process.
func (e *AlertEngine) Run(ctx context.Context) error {
e.ticker = NewTicker(clock.New(), 1*time.Second, metrics.NewTickerMetrics(prometheus.DefaultRegisterer))
alertGroup, ctx := errgroup.WithContext(ctx)
alertGroup.Go(func() error { return e.alertingTicker(ctx) })
alertGroup.Go(func() error { return e.runJobDispatcher(ctx) })

View File

@ -0,0 +1,35 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Ticker struct {
LastTickTime prometheus.Gauge
NextTickTime prometheus.Gauge
IntervalSeconds prometheus.Gauge
}
func NewTickerMetrics(reg prometheus.Registerer) *Ticker {
return &Ticker{
LastTickTime: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "grafana",
Subsystem: "alerting",
Name: "ticker_last_consumed_tick_timestamp_seconds",
Help: "Timestamp of the last consumed tick in seconds.",
}),
NextTickTime: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "grafana",
Subsystem: "alerting",
Name: "ticker_next_tick_timestamp_seconds",
Help: "Timestamp of the next tick in seconds before it is consumed.",
}),
IntervalSeconds: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "grafana",
Subsystem: "alerting",
Name: "ticker_interval_seconds",
Help: "Interval at which the ticker is meant to tick.",
}),
}
}

View File

@ -1,76 +1,57 @@
package alerting
import (
"fmt"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/services/alerting/metrics"
)
// Ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except:
// * it doesn't drop ticks for slow receivers, rather, it queues up. so that callers are in control to instrument what's going on.
// * it automatically ticks every second, which is the right thing in our current design
// * it ticks on intervalSec marks or very shortly after. this provides a predictable load pattern
// * it ticks on interval marks or very shortly after. this provides a predictable load pattern
// (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace)
// * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past
// * because we want to allow:
// - a clean "resume where we left off" and "don't yield ticks we already did"
// - adjusting offset over time to compensate for storage backing up or getting fast and providing lower latency
// you specify a lastProcessed timestamp as well as an offset at creation, or runtime
type Ticker struct {
C chan time.Time
clock clock.Clock
last time.Time
offset time.Duration
newOffset chan time.Duration
intervalSec int64
paused bool
C chan time.Time
clock clock.Clock
last time.Time
interval time.Duration
metrics *metrics.Ticker
}
// NewTicker returns a ticker that ticks on intervalSec marks or very shortly after, and never drops ticks
func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock, intervalSec int64) *Ticker {
t := &Ticker{
C: make(chan time.Time),
clock: c,
last: last,
offset: initialOffset,
newOffset: make(chan time.Duration),
intervalSec: intervalSec,
// NewTicker returns a Ticker that ticks on interval marks (or very shortly after) starting at c.Now(), and never drops ticks. interval should not be negative or zero.
func NewTicker(c clock.Clock, interval time.Duration, metric *metrics.Ticker) *Ticker {
if interval <= 0 {
panic(fmt.Errorf("non-positive interval [%v] is not allowed", interval))
}
t := &Ticker{
C: make(chan time.Time),
clock: c,
last: c.Now(),
interval: interval,
metrics: metric,
}
metric.IntervalSeconds.Set(t.interval.Seconds()) // Seconds report fractional part as well, so it matches the format of the timestamp we report below
go t.run()
return t
}
func (t *Ticker) run() {
for {
next := t.last.Add(time.Duration(t.intervalSec) * time.Second)
diff := t.clock.Now().Add(-t.offset).Sub(next)
next := t.last.Add(t.interval) // calculate the time of the next tick
t.metrics.NextTickTime.Set(float64(next.UnixNano()) / 1e9)
diff := t.clock.Now().Sub(next) // calculate the difference between the current time and the next tick
// if difference is not negative, then it should tick
if diff >= 0 {
if !t.paused {
t.C <- next
}
t.C <- next
t.last = next
t.metrics.LastTickTime.Set(float64(next.UnixNano()) / 1e9)
continue
}
// tick is too young. try again when ...
select {
case <-t.clock.After(-diff): // ...it'll definitely be old enough
case offset := <-t.newOffset: // ...it might be old enough
t.offset = offset
}
<-t.clock.After(-diff) // ...it'll definitely be old enough
}
}
// ResetOffset resets the offset.
func (t *Ticker) ResetOffset(duration time.Duration) {
t.newOffset <- duration
}
// Pause unpauses the ticker and no ticks will be sent.
func (t *Ticker) Pause() {
t.paused = true
}
// Unpause unpauses the ticker and ticks will be sent.
func (t *Ticker) Unpause() {
t.paused = false
}

View File

@ -1,6 +1,7 @@
package alerting
import (
"bytes"
"context"
"fmt"
"math/rand"
@ -9,7 +10,11 @@ import (
"time"
"github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/alerting/metrics"
)
func TestTicker(t *testing.T) {
@ -27,10 +32,8 @@ func TestTicker(t *testing.T) {
}
t.Run("should not drop ticks", func(t *testing.T) {
clk := clock.NewMock()
intervalSec := rand.Int63n(100) + 10
interval := time.Duration(intervalSec) * time.Second
last := clk.Now()
ticker := NewTicker(last, 0, clk, intervalSec)
interval := time.Duration(rand.Int63n(100)+10) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
ticks := rand.Intn(9) + 1
jitter := rand.Int63n(int64(interval) - 1)
@ -63,10 +66,8 @@ func TestTicker(t *testing.T) {
t.Run("should not put anything to channel until it's time", func(t *testing.T) {
clk := clock.NewMock()
intervalSec := rand.Int63n(9) + 1
interval := time.Duration(intervalSec) * time.Second
last := clk.Now()
ticker := NewTicker(last, 0, clk, intervalSec)
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
expectedTick := clk.Now().Add(interval)
for {
require.Empty(t, ticker.C)
@ -85,10 +86,8 @@ func TestTicker(t *testing.T) {
t.Run("should put the tick in the channel immediately if it is behind", func(t *testing.T) {
clk := clock.NewMock()
intervalSec := rand.Int63n(9) + 1
interval := time.Duration(intervalSec) * time.Second
last := clk.Now()
ticker := NewTicker(last, 0, clk, intervalSec)
interval := time.Duration(rand.Int63n(9)+1) * time.Second
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(prometheus.NewRegistry()))
// We can expect the first tick to be at a consistent interval. Take a snapshot of the clock now, before we advance it.
expectedTick := clk.Now().Add(interval)
@ -116,4 +115,38 @@ func TestTicker(t *testing.T) {
// Similarly, the second tick should be last tick + interval irregardless of wall time.
require.Equal(t, expectedTick.Add(interval), actual2)
})
t.Run("should report metrics", func(t *testing.T) {
clk := clock.NewMock()
clk.Set(time.Now())
interval := time.Duration(rand.Int63n(9)+1) * time.Second
registry := prometheus.NewPedanticRegistry()
ticker := NewTicker(clk, interval, metrics.NewTickerMetrics(registry))
expectedTick := clk.Now().Add(interval)
expectedMetricFmt := `# HELP grafana_alerting_ticker_interval_seconds Interval at which the ticker is meant to tick.
# TYPE grafana_alerting_ticker_interval_seconds gauge
grafana_alerting_ticker_interval_seconds %v
# HELP grafana_alerting_ticker_last_consumed_tick_timestamp_seconds Timestamp of the last consumed tick in seconds.
# TYPE grafana_alerting_ticker_last_consumed_tick_timestamp_seconds gauge
grafana_alerting_ticker_last_consumed_tick_timestamp_seconds %v
# HELP grafana_alerting_ticker_next_tick_timestamp_seconds Timestamp of the next tick in seconds before it is consumed.
# TYPE grafana_alerting_ticker_next_tick_timestamp_seconds gauge
grafana_alerting_ticker_next_tick_timestamp_seconds %v
`
expectedMetric := fmt.Sprintf(expectedMetricFmt, interval.Seconds(), 0, float64(expectedTick.UnixNano())/1e9)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(expectedMetric), "grafana_alerting_ticker_last_consumed_tick_timestamp_seconds", "grafana_alerting_ticker_next_tick_timestamp_seconds", "grafana_alerting_ticker_interval_seconds"))
clk.Add(interval)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(func() {
cancel()
})
actual := readChanOrFail(t, ctx, ticker.C)
expectedMetric = fmt.Sprintf(expectedMetricFmt, interval.Seconds(), float64(actual.UnixNano())/1e9, float64(expectedTick.Add(interval).UnixNano())/1e9)
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(expectedMetric), "grafana_alerting_ticker_last_consumed_tick_timestamp_seconds", "grafana_alerting_ticker_next_tick_timestamp_seconds", "grafana_alerting_ticker_interval_seconds"))
})
}

View File

@ -49,5 +49,6 @@ Scopes must have an order to ensure consistency and ease of search, this helps u
- [BUGFIX] (Legacy) Templates: Parse notification templates using all the matches of the alert rule when going from `Alerting` to `OK` in legacy alerting #47355
- [BUGFIX] Scheduler: Fix state manager to support OK option of `AlertRule.ExecErrState` #47670
- [ENHANCEMENT] Templates: Enable the use of classic condition values in templates #46971
- [ENHANCEMENT] Scheduler: ticker expose new metrics `grafana_alerting_ticker_last_consumed_tick_timestamp_seconds`, `grafana_alerting_ticker_next_tick_timestamp_seconds`, `grafana_alerting_ticker_interval_seconds` #47828
- [CHANGE] Notification URL points to alert view page instead of alert edit page. #47752
- [FEATURE] Indicate whether routes are provisioned when GETting Alertmanager configuration #47857

View File

@ -7,15 +7,17 @@ import (
"sync"
"time"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/models"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/web"
"github.com/prometheus/alertmanager/api/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/models"
legacyMetrics "github.com/grafana/grafana/pkg/services/alerting/metrics"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/web"
)
const (
@ -52,6 +54,7 @@ type Scheduler struct {
EvalDuration *prometheus.SummaryVec
GetAlertRulesDuration prometheus.Histogram
SchedulePeriodicDuration prometheus.Histogram
Ticker *legacyMetrics.Ticker
}
type MultiOrgAlertmanager struct {
@ -179,6 +182,7 @@ func newSchedulerMetrics(r prometheus.Registerer) *Scheduler {
Buckets: []float64{0.1, 0.25, 0.5, 1, 2, 5, 10},
},
),
Ticker: legacyMetrics.NewTickerMetrics(r),
}
}

View File

@ -31,8 +31,6 @@ type ScheduleService interface {
// Run the scheduler until the context is canceled or the scheduler returns
// an error. The scheduler is terminated when this function returns.
Run(context.Context) error
Pause() error
Unpause() error
// AlertmanagersFor returns all the discovered Alertmanager URLs for the
// organization.
@ -123,7 +121,7 @@ type SchedulerCfg struct {
// NewScheduler returns a new schedule.
func NewScheduler(cfg SchedulerCfg, expressionService *expr.Service, appURL *url.URL, stateManager *state.Manager) *schedule {
ticker := alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
ticker := alerting.NewTicker(cfg.C, cfg.BaseInterval, cfg.Metrics.Ticker)
sch := schedule{
registry: alertRuleRegistry{alertRuleInfo: make(map[models.AlertRuleKey]*alertRuleInfo)},
@ -154,24 +152,6 @@ func NewScheduler(cfg SchedulerCfg, expressionService *expr.Service, appURL *url
return &sch
}
func (sch *schedule) Pause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
sch.ticker.Pause()
sch.log.Info("alert rule scheduler paused", "now", sch.clock.Now())
return nil
}
func (sch *schedule) Unpause() error {
if sch == nil {
return fmt.Errorf("scheduler is not initialised")
}
sch.ticker.Unpause()
sch.log.Info("alert rule scheduler unpaused", "now", sch.clock.Now())
return nil
}
func (sch *schedule) Run(ctx context.Context) error {
var wg sync.WaitGroup
wg.Add(2)
@ -789,7 +769,7 @@ type evaluation struct {
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
sch.clock = cfg.C
sch.baseInterval = cfg.BaseInterval
sch.ticker = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
sch.ticker = alerting.NewTicker(cfg.C, cfg.BaseInterval, cfg.Metrics.Ticker)
sch.evalAppliedFunc = cfg.EvalAppliedFunc
sch.stopAppliedFunc = cfg.StopAppliedFunc
}

View File

@ -12,15 +12,16 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/ini.v1"
"github.com/grafana/grafana/pkg/api"
"github.com/grafana/grafana/pkg/infra/fs"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/server"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/ini.v1"
)
// StartGrafana starts a Grafana server.