mirror of
https://github.com/grafana/grafana.git
synced 2024-11-28 11:44:26 -06:00
3cac10e598
* Always use cache: stop passing skipCache among ngalert functions * Add updated column * Scheduler initial draft * Add retry on failure * Allow settting/updating alert definition interval Set default interval if no interval is provided during alert definition creation. Keep existing alert definition interval if no interval is provided during alert definition update. * Parameterise alerting.Ticker to run on custom interval * Allow updating alert definition interval without having to provide the queries and expressions * Add schedule tests * Use xorm tags for having initialisms with consistent case in Go * Add ability to pause/unpause the scheduler * Add alert definition versioning * Optimise scheduler to fetch alert definition only when it's necessary * Change MySQL data column to mediumtext * Delete alert definition versions * Increase default scheduler interval to 10 seconds * Fix setting OrgID on updates * Add validation for alert definition name length * Recreate tables
77 lines
2.4 KiB
Go
77 lines
2.4 KiB
Go
package alerting
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/benbjohnson/clock"
|
|
)
|
|
|
|
// Ticker is a ticker to power the alerting scheduler. it's like a time.Ticker, except:
|
|
// * it doesn't drop ticks for slow receivers, rather, it queues up. so that callers are in control to instrument what's going on.
|
|
// * it automatically ticks every second, which is the right thing in our current design
|
|
// * it ticks on intervalSec marks or very shortly after. this provides a predictable load pattern
|
|
// (this shouldn't cause too much load contention issues because the next steps in the pipeline just process at their own pace)
|
|
// * the timestamps are used to mark "last datapoint to query for" and as such, are a configurable amount of seconds in the past
|
|
// * because we want to allow:
|
|
// - a clean "resume where we left off" and "don't yield ticks we already did"
|
|
// - adjusting offset over time to compensate for storage backing up or getting fast and providing lower latency
|
|
// you specify a lastProcessed timestamp as well as an offset at creation, or runtime
|
|
type Ticker struct {
|
|
C chan time.Time
|
|
clock clock.Clock
|
|
last time.Time
|
|
offset time.Duration
|
|
newOffset chan time.Duration
|
|
intervalSec int64
|
|
paused bool
|
|
}
|
|
|
|
// NewTicker returns a ticker that ticks on intervalSec marks or very shortly after, and never drops ticks
|
|
func NewTicker(last time.Time, initialOffset time.Duration, c clock.Clock, intervalSec int64) *Ticker {
|
|
t := &Ticker{
|
|
C: make(chan time.Time),
|
|
clock: c,
|
|
last: last,
|
|
offset: initialOffset,
|
|
newOffset: make(chan time.Duration),
|
|
intervalSec: intervalSec,
|
|
}
|
|
go t.run()
|
|
return t
|
|
}
|
|
|
|
func (t *Ticker) run() {
|
|
for {
|
|
next := t.last.Add(time.Duration(t.intervalSec) * time.Second)
|
|
diff := t.clock.Now().Add(-t.offset).Sub(next)
|
|
if diff >= 0 {
|
|
if !t.paused {
|
|
t.C <- next
|
|
}
|
|
t.last = next
|
|
continue
|
|
}
|
|
// tick is too young. try again when ...
|
|
select {
|
|
case <-t.clock.After(-diff): // ...it'll definitely be old enough
|
|
case offset := <-t.newOffset: // ...it might be old enough
|
|
t.offset = offset
|
|
}
|
|
}
|
|
}
|
|
|
|
// ResetOffset resets the offset.
|
|
func (t *Ticker) ResetOffset(duration time.Duration) {
|
|
t.newOffset <- duration
|
|
}
|
|
|
|
// Pause unpauses the ticker and no ticks will be sent.
|
|
func (t *Ticker) Pause() {
|
|
t.paused = true
|
|
}
|
|
|
|
// Unpause unpauses the ticker and ticks will be sent.
|
|
func (t *Ticker) Unpause() {
|
|
t.paused = false
|
|
}
|