2021-03-08 22:19:21 +02:00
package schedule
2020-12-17 16:00:09 +02:00
import (
"context"
"fmt"
"sync"
"time"
2021-03-24 15:34:18 -07:00
"github.com/grafana/grafana/pkg/services/ngalert/state"
2021-03-08 22:19:21 +02:00
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/ngalert/models"
2020-12-17 16:00:09 +02:00
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
2021-03-08 07:02:49 +01:00
"github.com/grafana/grafana/pkg/tsdb"
2020-12-17 16:00:09 +02:00
"golang.org/x/sync/errgroup"
)
2021-03-08 22:19:21 +02:00
// timeNow makes it possible to test usage of time
var timeNow = time . Now
// ScheduleService handles scheduling
type ScheduleService interface {
2021-03-24 15:34:18 -07:00
Ticker ( context . Context , * state . StateTracker ) error
2021-03-03 17:52:19 +02:00
Pause ( ) error
Unpause ( ) error
// the following are used by tests only used for tests
2021-03-08 22:19:21 +02:00
evalApplied ( models . AlertDefinitionKey , time . Time )
stopApplied ( models . AlertDefinitionKey )
overrideCfg ( cfg SchedulerCfg )
2021-03-03 17:52:19 +02:00
}
2021-03-24 15:34:18 -07:00
func ( sch * schedule ) definitionRoutine ( grafanaCtx context . Context , key models . AlertDefinitionKey , evalCh <- chan * evalContext , stopCh <- chan struct { } , stateTracker * state . StateTracker ) error {
2021-03-03 17:52:19 +02:00
sch . log . Debug ( "alert definition routine started" , "key" , key )
2020-12-17 16:00:09 +02:00
evalRunning := false
var start , end time . Time
var attempt int64
2021-03-08 22:19:21 +02:00
var alertDefinition * models . AlertDefinition
2020-12-17 16:00:09 +02:00
for {
select {
case ctx := <- evalCh :
if evalRunning {
continue
}
evaluate := func ( attempt int64 ) error {
start = timeNow ( )
// fetch latest alert definition version
if alertDefinition == nil || alertDefinition . Version < ctx . version {
2021-03-08 22:19:21 +02:00
q := models . GetAlertDefinitionByUIDQuery { OrgID : key . OrgID , UID : key . DefinitionUID }
err := sch . store . GetAlertDefinitionByUID ( & q )
2020-12-17 16:00:09 +02:00
if err != nil {
2021-03-03 17:52:19 +02:00
sch . log . Error ( "failed to fetch alert definition" , "key" , key )
2020-12-17 16:00:09 +02:00
return err
}
alertDefinition = q . Result
2021-03-03 17:52:19 +02:00
sch . log . Debug ( "new alert definition version fetched" , "title" , alertDefinition . Title , "key" , key , "version" , alertDefinition . Version )
2020-12-17 16:00:09 +02:00
}
2021-03-11 18:56:58 +02:00
condition := models . Condition {
2021-03-24 08:07:29 -04:00
Condition : alertDefinition . Condition ,
OrgID : alertDefinition . OrgID ,
Data : alertDefinition . Data ,
2020-12-17 16:00:09 +02:00
}
2021-03-08 07:02:49 +01:00
results , err := sch . evaluator . ConditionEval ( & condition , ctx . now , sch . dataService )
2020-12-17 16:00:09 +02:00
end = timeNow ( )
if err != nil {
2021-01-18 20:57:17 +02:00
// consider saving alert instance on error
2021-03-08 07:02:49 +01:00
sch . log . Error ( "failed to evaluate alert definition" , "title" , alertDefinition . Title ,
"key" , key , "attempt" , attempt , "now" , ctx . now , "duration" , end . Sub ( start ) , "error" , err )
2020-12-17 16:00:09 +02:00
return err
}
for _ , r := range results {
2021-03-24 15:34:18 -07:00
sch . log . Info ( "alert definition result" , "title" , alertDefinition . Title , "key" , key , "attempt" , attempt , "now" , ctx . now , "duration" , end . Sub ( start ) , "instance" , r . Instance , "state" , r . State . String ( ) )
2021-03-08 22:19:21 +02:00
cmd := models . SaveAlertInstanceCommand { DefinitionOrgID : key . OrgID , DefinitionUID : key . DefinitionUID , State : models . InstanceStateType ( r . State . String ( ) ) , Labels : models . InstanceLabels ( r . Instance ) , LastEvalTime : ctx . now }
err := sch . store . SaveAlertInstance ( & cmd )
2021-01-18 20:57:17 +02:00
if err != nil {
2021-03-03 17:52:19 +02:00
sch . log . Error ( "failed saving alert instance" , "title" , alertDefinition . Title , "key" , key , "attempt" , attempt , "now" , ctx . now , "instance" , r . Instance , "state" , r . State . String ( ) , "error" , err )
2021-01-18 20:57:17 +02:00
}
2020-12-17 16:00:09 +02:00
}
2021-03-24 15:34:18 -07:00
_ = stateTracker . ProcessEvalResults ( key . DefinitionUID , results , condition )
2020-12-17 16:00:09 +02:00
return nil
}
func ( ) {
evalRunning = true
defer func ( ) {
evalRunning = false
2021-03-03 17:52:19 +02:00
sch . evalApplied ( key , ctx . now )
2020-12-17 16:00:09 +02:00
} ( )
2021-03-03 17:52:19 +02:00
for attempt = 0 ; attempt < sch . maxAttempts ; attempt ++ {
2020-12-17 16:00:09 +02:00
err := evaluate ( attempt )
if err == nil {
break
}
}
} ( )
2021-01-11 16:14:03 +02:00
case <- stopCh :
2021-03-03 17:52:19 +02:00
sch . stopApplied ( key )
sch . log . Debug ( "stopping alert definition routine" , "key" , key )
2021-01-11 16:14:03 +02:00
// interrupt evaluation if it's running
return nil
2020-12-17 16:00:09 +02:00
case <- grafanaCtx . Done ( ) :
return grafanaCtx . Err ( )
}
}
}
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time . Duration
// each alert definition gets its own channel and routine
registry alertDefinitionRegistry
maxAttempts int64
clock clock . Clock
heartbeat * alerting . Ticker
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
2021-03-08 22:19:21 +02:00
evalAppliedFunc func ( models . AlertDefinitionKey , time . Time )
2020-12-17 16:00:09 +02:00
2021-01-11 16:14:03 +02:00
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
2021-03-08 22:19:21 +02:00
stopAppliedFunc func ( models . AlertDefinitionKey )
2021-01-11 16:14:03 +02:00
2020-12-17 16:00:09 +02:00
log log . Logger
2021-01-22 19:27:33 +02:00
evaluator eval . Evaluator
2021-03-03 17:52:19 +02:00
2021-03-08 22:19:21 +02:00
store store . Store
2021-03-08 07:02:49 +01:00
dataService * tsdb . Service
2021-01-22 19:27:33 +02:00
}
2021-03-08 22:19:21 +02:00
// SchedulerCfg is the scheduler configuration.
type SchedulerCfg struct {
C clock . Clock
BaseInterval time . Duration
Logger log . Logger
EvalAppliedFunc func ( models . AlertDefinitionKey , time . Time )
MaxAttempts int64
StopAppliedFunc func ( models . AlertDefinitionKey )
Evaluator eval . Evaluator
Store store . Store
2020-12-17 16:00:09 +02:00
}
2021-03-08 22:19:21 +02:00
// NewScheduler returns a new schedule.
func NewScheduler ( cfg SchedulerCfg , dataService * tsdb . Service ) * schedule {
ticker := alerting . NewTicker ( cfg . C . Now ( ) , time . Second * 0 , cfg . C , int64 ( cfg . BaseInterval . Seconds ( ) ) )
2020-12-17 16:00:09 +02:00
sch := schedule {
2021-03-08 22:19:21 +02:00
registry : alertDefinitionRegistry { alertDefinitionInfo : make ( map [ models . AlertDefinitionKey ] alertDefinitionInfo ) } ,
maxAttempts : cfg . MaxAttempts ,
clock : cfg . C ,
baseInterval : cfg . BaseInterval ,
log : cfg . Logger ,
2021-03-03 17:52:19 +02:00
heartbeat : ticker ,
2021-03-08 22:19:21 +02:00
evalAppliedFunc : cfg . EvalAppliedFunc ,
stopAppliedFunc : cfg . StopAppliedFunc ,
evaluator : cfg . Evaluator ,
store : cfg . Store ,
2021-03-08 07:02:49 +01:00
dataService : dataService ,
2020-12-17 16:00:09 +02:00
}
return & sch
}
2021-03-08 22:19:21 +02:00
func ( sch * schedule ) overrideCfg ( cfg SchedulerCfg ) {
sch . clock = cfg . C
sch . baseInterval = cfg . BaseInterval
sch . heartbeat = alerting . NewTicker ( cfg . C . Now ( ) , time . Second * 0 , cfg . C , int64 ( cfg . BaseInterval . Seconds ( ) ) )
sch . evalAppliedFunc = cfg . EvalAppliedFunc
sch . stopAppliedFunc = cfg . StopAppliedFunc
2021-03-03 17:52:19 +02:00
}
2021-03-08 22:19:21 +02:00
func ( sch * schedule ) evalApplied ( alertDefKey models . AlertDefinitionKey , now time . Time ) {
2021-03-03 17:52:19 +02:00
if sch . evalAppliedFunc == nil {
return
}
sch . evalAppliedFunc ( alertDefKey , now )
}
2021-03-08 22:19:21 +02:00
func ( sch * schedule ) stopApplied ( alertDefKey models . AlertDefinitionKey ) {
2021-03-03 17:52:19 +02:00
if sch . stopAppliedFunc == nil {
return
}
sch . stopAppliedFunc ( alertDefKey )
}
func ( sch * schedule ) Pause ( ) error {
2020-12-17 16:00:09 +02:00
if sch == nil {
return fmt . Errorf ( "scheduler is not initialised" )
}
sch . heartbeat . Pause ( )
sch . log . Info ( "alert definition scheduler paused" , "now" , sch . clock . Now ( ) )
return nil
}
2021-03-03 17:52:19 +02:00
func ( sch * schedule ) Unpause ( ) error {
2020-12-17 16:00:09 +02:00
if sch == nil {
return fmt . Errorf ( "scheduler is not initialised" )
}
sch . heartbeat . Unpause ( )
sch . log . Info ( "alert definition scheduler unpaused" , "now" , sch . clock . Now ( ) )
return nil
}
2021-03-24 15:34:18 -07:00
func ( sch * schedule ) Ticker ( grafanaCtx context . Context , stateTracker * state . StateTracker ) error {
2020-12-17 16:00:09 +02:00
dispatcherGroup , ctx := errgroup . WithContext ( grafanaCtx )
for {
select {
2021-03-03 17:52:19 +02:00
case tick := <- sch . heartbeat . C :
tickNum := tick . Unix ( ) / int64 ( sch . baseInterval . Seconds ( ) )
alertDefinitions := sch . fetchAllDetails ( tick )
sch . log . Debug ( "alert definitions fetched" , "count" , len ( alertDefinitions ) )
2020-12-17 16:00:09 +02:00
// registeredDefinitions is a map used for finding deleted alert definitions
// initially it is assigned to all known alert definitions from the previous cycle
// each alert definition found also in this cycle is removed
// so, at the end, the remaining registered alert definitions are the deleted ones
2021-03-03 17:52:19 +02:00
registeredDefinitions := sch . registry . keyMap ( )
2020-12-17 16:00:09 +02:00
type readyToRunItem struct {
2021-03-08 22:19:21 +02:00
key models . AlertDefinitionKey
2020-12-17 16:00:09 +02:00
definitionInfo alertDefinitionInfo
}
readyToRun := make ( [ ] readyToRunItem , 0 )
for _ , item := range alertDefinitions {
2021-01-27 15:51:00 +02:00
if item . Paused {
continue
}
2021-03-08 22:19:21 +02:00
key := item . GetKey ( )
2020-12-17 16:00:09 +02:00
itemVersion := item . Version
2021-03-03 17:52:19 +02:00
newRoutine := ! sch . registry . exists ( key )
definitionInfo := sch . registry . getOrCreateInfo ( key , itemVersion )
invalidInterval := item . IntervalSeconds % int64 ( sch . baseInterval . Seconds ( ) ) != 0
2020-12-17 16:00:09 +02:00
if newRoutine && ! invalidInterval {
dispatcherGroup . Go ( func ( ) error {
2021-03-24 15:34:18 -07:00
return sch . definitionRoutine ( ctx , key , definitionInfo . evalCh , definitionInfo . stopCh , stateTracker )
2020-12-17 16:00:09 +02:00
} )
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert definition updates
2021-03-03 17:52:19 +02:00
sch . log . Debug ( "alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval" , "key" , key , "interval" , time . Duration ( item . IntervalSeconds ) * time . Second , "scheduler interval" , sch . baseInterval )
2020-12-17 16:00:09 +02:00
continue
}
2021-03-03 17:52:19 +02:00
itemFrequency := item . IntervalSeconds / int64 ( sch . baseInterval . Seconds ( ) )
2020-12-17 16:00:09 +02:00
if item . IntervalSeconds != 0 && tickNum % itemFrequency == 0 {
2021-01-07 17:45:42 +02:00
readyToRun = append ( readyToRun , readyToRunItem { key : key , definitionInfo : definitionInfo } )
2020-12-17 16:00:09 +02:00
}
// remove the alert definition from the registered alert definitions
2021-01-07 17:45:42 +02:00
delete ( registeredDefinitions , key )
2020-12-17 16:00:09 +02:00
}
var step int64 = 0
if len ( readyToRun ) > 0 {
2021-03-03 17:52:19 +02:00
step = sch . baseInterval . Nanoseconds ( ) / int64 ( len ( readyToRun ) )
2020-12-17 16:00:09 +02:00
}
for i := range readyToRun {
item := readyToRun [ i ]
time . AfterFunc ( time . Duration ( int64 ( i ) * step ) , func ( ) {
2021-01-11 16:14:03 +02:00
item . definitionInfo . evalCh <- & evalContext { now : tick , version : item . definitionInfo . version }
2020-12-17 16:00:09 +02:00
} )
}
// unregister and stop routines of the deleted alert definitions
2021-01-07 17:45:42 +02:00
for key := range registeredDefinitions {
2021-03-03 17:52:19 +02:00
definitionInfo , err := sch . registry . get ( key )
2021-01-11 16:14:03 +02:00
if err != nil {
2021-03-03 17:52:19 +02:00
sch . log . Error ( "failed to get alert definition routine information" , "err" , err )
2021-01-11 16:14:03 +02:00
continue
}
definitionInfo . stopCh <- struct { } { }
2021-03-03 17:52:19 +02:00
sch . registry . del ( key )
2020-12-17 16:00:09 +02:00
}
case <- grafanaCtx . Done ( ) :
err := dispatcherGroup . Wait ( )
return err
}
}
}
type alertDefinitionRegistry struct {
mu sync . Mutex
2021-03-08 22:19:21 +02:00
alertDefinitionInfo map [ models . AlertDefinitionKey ] alertDefinitionInfo
2020-12-17 16:00:09 +02:00
}
// getOrCreateInfo returns the channel for the specific alert definition
// if it does not exists creates one and returns it
2021-03-08 22:19:21 +02:00
func ( r * alertDefinitionRegistry ) getOrCreateInfo ( key models . AlertDefinitionKey , definitionVersion int64 ) alertDefinitionInfo {
2020-12-17 16:00:09 +02:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-01-07 17:45:42 +02:00
info , ok := r . alertDefinitionInfo [ key ]
2020-12-17 16:00:09 +02:00
if ! ok {
2021-01-11 16:14:03 +02:00
r . alertDefinitionInfo [ key ] = alertDefinitionInfo { evalCh : make ( chan * evalContext ) , stopCh : make ( chan struct { } ) , version : definitionVersion }
2021-01-07 17:45:42 +02:00
return r . alertDefinitionInfo [ key ]
2020-12-17 16:00:09 +02:00
}
info . version = definitionVersion
2021-01-07 17:45:42 +02:00
r . alertDefinitionInfo [ key ] = info
2020-12-17 16:00:09 +02:00
return info
}
2021-01-11 16:14:03 +02:00
// get returns the channel for the specific alert definition
// if the key does not exist returns an error
2021-03-08 22:19:21 +02:00
func ( r * alertDefinitionRegistry ) get ( key models . AlertDefinitionKey ) ( * alertDefinitionInfo , error ) {
2021-01-11 16:14:03 +02:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
info , ok := r . alertDefinitionInfo [ key ]
if ! ok {
return nil , fmt . Errorf ( "%v key not found" , key )
}
return & info , nil
}
2021-03-08 22:19:21 +02:00
func ( r * alertDefinitionRegistry ) exists ( key models . AlertDefinitionKey ) bool {
2020-12-17 16:00:09 +02:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-01-07 17:45:42 +02:00
_ , ok := r . alertDefinitionInfo [ key ]
2020-12-17 16:00:09 +02:00
return ok
}
2021-03-08 22:19:21 +02:00
func ( r * alertDefinitionRegistry ) del ( key models . AlertDefinitionKey ) {
2020-12-17 16:00:09 +02:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-01-07 17:45:42 +02:00
delete ( r . alertDefinitionInfo , key )
2020-12-17 16:00:09 +02:00
}
2021-03-08 22:19:21 +02:00
func ( r * alertDefinitionRegistry ) iter ( ) <- chan models . AlertDefinitionKey {
c := make ( chan models . AlertDefinitionKey )
2020-12-17 16:00:09 +02:00
f := func ( ) {
r . mu . Lock ( )
defer r . mu . Unlock ( )
for k := range r . alertDefinitionInfo {
c <- k
}
close ( c )
}
go f ( )
return c
}
2021-03-08 22:19:21 +02:00
func ( r * alertDefinitionRegistry ) keyMap ( ) map [ models . AlertDefinitionKey ] struct { } {
definitionsIDs := make ( map [ models . AlertDefinitionKey ] struct { } )
2021-01-07 17:45:42 +02:00
for k := range r . iter ( ) {
definitionsIDs [ k ] = struct { } { }
2020-12-17 16:00:09 +02:00
}
return definitionsIDs
}
type alertDefinitionInfo struct {
2021-01-11 16:14:03 +02:00
evalCh chan * evalContext
stopCh chan struct { }
2020-12-17 16:00:09 +02:00
version int64
}
type evalContext struct {
now time . Time
version int64
}