2020-12-17 08:00:09 -06:00
package ngalert
import (
"context"
"fmt"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
2021-03-08 00:02:49 -06:00
"github.com/grafana/grafana/pkg/tsdb"
2020-12-17 08:00:09 -06:00
"golang.org/x/sync/errgroup"
)
2021-03-03 09:52:19 -06:00
type scheduleService interface {
Ticker ( context . Context ) error
Pause ( ) error
Unpause ( ) error
// the following are used by tests only used for tests
evalApplied ( alertDefinitionKey , time . Time )
stopApplied ( alertDefinitionKey )
overrideCfg ( cfg schedulerCfg )
}
2021-03-08 00:02:49 -06:00
func ( sch * schedule ) definitionRoutine ( grafanaCtx context . Context , key alertDefinitionKey ,
evalCh <- chan * evalContext , stopCh <- chan struct { } ) error {
2021-03-03 09:52:19 -06:00
sch . log . Debug ( "alert definition routine started" , "key" , key )
2020-12-17 08:00:09 -06:00
evalRunning := false
var start , end time . Time
var attempt int64
var alertDefinition * AlertDefinition
for {
select {
case ctx := <- evalCh :
if evalRunning {
continue
}
evaluate := func ( attempt int64 ) error {
start = timeNow ( )
// fetch latest alert definition version
if alertDefinition == nil || alertDefinition . Version < ctx . version {
2021-01-07 09:45:42 -06:00
q := getAlertDefinitionByUIDQuery { OrgID : key . orgID , UID : key . definitionUID }
2021-03-03 09:52:19 -06:00
err := sch . store . getAlertDefinitionByUID ( & q )
2020-12-17 08:00:09 -06:00
if err != nil {
2021-03-03 09:52:19 -06:00
sch . log . Error ( "failed to fetch alert definition" , "key" , key )
2020-12-17 08:00:09 -06:00
return err
}
alertDefinition = q . Result
2021-03-03 09:52:19 -06:00
sch . log . Debug ( "new alert definition version fetched" , "title" , alertDefinition . Title , "key" , key , "version" , alertDefinition . Version )
2020-12-17 08:00:09 -06:00
}
condition := eval . Condition {
RefID : alertDefinition . Condition ,
OrgID : alertDefinition . OrgID ,
QueriesAndExpressions : alertDefinition . Data ,
}
2021-03-08 00:02:49 -06:00
results , err := sch . evaluator . ConditionEval ( & condition , ctx . now , sch . dataService )
2020-12-17 08:00:09 -06:00
end = timeNow ( )
if err != nil {
2021-01-18 12:57:17 -06:00
// consider saving alert instance on error
2021-03-08 00:02:49 -06:00
sch . log . Error ( "failed to evaluate alert definition" , "title" , alertDefinition . Title ,
"key" , key , "attempt" , attempt , "now" , ctx . now , "duration" , end . Sub ( start ) , "error" , err )
2020-12-17 08:00:09 -06:00
return err
}
for _ , r := range results {
2021-03-03 09:52:19 -06:00
sch . log . Debug ( "alert definition result" , "title" , alertDefinition . Title , "key" , key , "attempt" , attempt , "now" , ctx . now , "duration" , end . Sub ( start ) , "instance" , r . Instance , "state" , r . State . String ( ) )
2021-01-18 12:57:17 -06:00
cmd := saveAlertInstanceCommand { DefinitionOrgID : key . orgID , DefinitionUID : key . definitionUID , State : InstanceStateType ( r . State . String ( ) ) , Labels : InstanceLabels ( r . Instance ) , LastEvalTime : ctx . now }
2021-03-03 09:52:19 -06:00
err := sch . store . saveAlertInstance ( & cmd )
2021-01-18 12:57:17 -06:00
if err != nil {
2021-03-03 09:52:19 -06:00
sch . log . Error ( "failed saving alert instance" , "title" , alertDefinition . Title , "key" , key , "attempt" , attempt , "now" , ctx . now , "instance" , r . Instance , "state" , r . State . String ( ) , "error" , err )
2021-01-18 12:57:17 -06:00
}
2020-12-17 08:00:09 -06:00
}
return nil
}
func ( ) {
evalRunning = true
defer func ( ) {
evalRunning = false
2021-03-03 09:52:19 -06:00
sch . evalApplied ( key , ctx . now )
2020-12-17 08:00:09 -06:00
} ( )
2021-03-03 09:52:19 -06:00
for attempt = 0 ; attempt < sch . maxAttempts ; attempt ++ {
2020-12-17 08:00:09 -06:00
err := evaluate ( attempt )
if err == nil {
break
}
}
} ( )
2021-01-11 08:14:03 -06:00
case <- stopCh :
2021-03-03 09:52:19 -06:00
sch . stopApplied ( key )
sch . log . Debug ( "stopping alert definition routine" , "key" , key )
2021-01-11 08:14:03 -06:00
// interrupt evaluation if it's running
return nil
2020-12-17 08:00:09 -06:00
case <- grafanaCtx . Done ( ) :
return grafanaCtx . Err ( )
}
}
}
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time . Duration
// each alert definition gets its own channel and routine
registry alertDefinitionRegistry
maxAttempts int64
clock clock . Clock
heartbeat * alerting . Ticker
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
2021-03-03 09:52:19 -06:00
evalAppliedFunc func ( alertDefinitionKey , time . Time )
2020-12-17 08:00:09 -06:00
2021-01-11 08:14:03 -06:00
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
2021-03-03 09:52:19 -06:00
stopAppliedFunc func ( alertDefinitionKey )
2021-01-11 08:14:03 -06:00
2020-12-17 08:00:09 -06:00
log log . Logger
2021-01-22 11:27:33 -06:00
evaluator eval . Evaluator
2021-03-03 09:52:19 -06:00
store store
2021-03-08 00:02:49 -06:00
dataService * tsdb . Service
2021-01-22 11:27:33 -06:00
}
type schedulerCfg struct {
2021-03-03 09:52:19 -06:00
c clock . Clock
baseInterval time . Duration
logger log . Logger
evalAppliedFunc func ( alertDefinitionKey , time . Time )
stopAppliedFunc func ( alertDefinitionKey )
evaluator eval . Evaluator
store store
2020-12-17 08:00:09 -06:00
}
// newScheduler returns a new schedule.
2021-03-08 00:02:49 -06:00
func newScheduler ( cfg schedulerCfg , dataService * tsdb . Service ) * schedule {
2021-01-22 11:27:33 -06:00
ticker := alerting . NewTicker ( cfg . c . Now ( ) , time . Second * 0 , cfg . c , int64 ( cfg . baseInterval . Seconds ( ) ) )
2020-12-17 08:00:09 -06:00
sch := schedule {
2021-03-03 09:52:19 -06:00
registry : alertDefinitionRegistry { alertDefinitionInfo : make ( map [ alertDefinitionKey ] alertDefinitionInfo ) } ,
maxAttempts : maxAttempts ,
clock : cfg . c ,
baseInterval : cfg . baseInterval ,
log : cfg . logger ,
heartbeat : ticker ,
evalAppliedFunc : cfg . evalAppliedFunc ,
stopAppliedFunc : cfg . stopAppliedFunc ,
evaluator : cfg . evaluator ,
store : cfg . store ,
2021-03-08 00:02:49 -06:00
dataService : dataService ,
2020-12-17 08:00:09 -06:00
}
return & sch
}
2021-03-03 09:52:19 -06:00
func ( sch * schedule ) overrideCfg ( cfg schedulerCfg ) {
sch . clock = cfg . c
sch . baseInterval = cfg . baseInterval
sch . heartbeat = alerting . NewTicker ( cfg . c . Now ( ) , time . Second * 0 , cfg . c , int64 ( cfg . baseInterval . Seconds ( ) ) )
sch . evalAppliedFunc = cfg . evalAppliedFunc
sch . stopAppliedFunc = cfg . stopAppliedFunc
}
func ( sch * schedule ) evalApplied ( alertDefKey alertDefinitionKey , now time . Time ) {
if sch . evalAppliedFunc == nil {
return
}
sch . evalAppliedFunc ( alertDefKey , now )
}
func ( sch * schedule ) stopApplied ( alertDefKey alertDefinitionKey ) {
if sch . stopAppliedFunc == nil {
return
}
sch . stopAppliedFunc ( alertDefKey )
}
func ( sch * schedule ) Pause ( ) error {
2020-12-17 08:00:09 -06:00
if sch == nil {
return fmt . Errorf ( "scheduler is not initialised" )
}
sch . heartbeat . Pause ( )
sch . log . Info ( "alert definition scheduler paused" , "now" , sch . clock . Now ( ) )
return nil
}
2021-03-03 09:52:19 -06:00
func ( sch * schedule ) Unpause ( ) error {
2020-12-17 08:00:09 -06:00
if sch == nil {
return fmt . Errorf ( "scheduler is not initialised" )
}
sch . heartbeat . Unpause ( )
sch . log . Info ( "alert definition scheduler unpaused" , "now" , sch . clock . Now ( ) )
return nil
}
2021-03-03 09:52:19 -06:00
func ( sch * schedule ) Ticker ( grafanaCtx context . Context ) error {
2020-12-17 08:00:09 -06:00
dispatcherGroup , ctx := errgroup . WithContext ( grafanaCtx )
for {
select {
2021-03-03 09:52:19 -06:00
case tick := <- sch . heartbeat . C :
tickNum := tick . Unix ( ) / int64 ( sch . baseInterval . Seconds ( ) )
alertDefinitions := sch . fetchAllDetails ( tick )
sch . log . Debug ( "alert definitions fetched" , "count" , len ( alertDefinitions ) )
2020-12-17 08:00:09 -06:00
// registeredDefinitions is a map used for finding deleted alert definitions
// initially it is assigned to all known alert definitions from the previous cycle
// each alert definition found also in this cycle is removed
// so, at the end, the remaining registered alert definitions are the deleted ones
2021-03-03 09:52:19 -06:00
registeredDefinitions := sch . registry . keyMap ( )
2020-12-17 08:00:09 -06:00
type readyToRunItem struct {
2021-01-07 09:45:42 -06:00
key alertDefinitionKey
2020-12-17 08:00:09 -06:00
definitionInfo alertDefinitionInfo
}
readyToRun := make ( [ ] readyToRunItem , 0 )
for _ , item := range alertDefinitions {
2021-01-27 07:51:00 -06:00
if item . Paused {
continue
}
2021-01-07 09:45:42 -06:00
key := item . getKey ( )
2020-12-17 08:00:09 -06:00
itemVersion := item . Version
2021-03-03 09:52:19 -06:00
newRoutine := ! sch . registry . exists ( key )
definitionInfo := sch . registry . getOrCreateInfo ( key , itemVersion )
invalidInterval := item . IntervalSeconds % int64 ( sch . baseInterval . Seconds ( ) ) != 0
2020-12-17 08:00:09 -06:00
if newRoutine && ! invalidInterval {
dispatcherGroup . Go ( func ( ) error {
2021-03-03 09:52:19 -06:00
return sch . definitionRoutine ( ctx , key , definitionInfo . evalCh , definitionInfo . stopCh )
2020-12-17 08:00:09 -06:00
} )
}
if invalidInterval {
// this is expected to be always false
// give that we validate interval during alert definition updates
2021-03-03 09:52:19 -06:00
sch . log . Debug ( "alert definition with invalid interval will be ignored: interval should be divided exactly by scheduler interval" , "key" , key , "interval" , time . Duration ( item . IntervalSeconds ) * time . Second , "scheduler interval" , sch . baseInterval )
2020-12-17 08:00:09 -06:00
continue
}
2021-03-03 09:52:19 -06:00
itemFrequency := item . IntervalSeconds / int64 ( sch . baseInterval . Seconds ( ) )
2020-12-17 08:00:09 -06:00
if item . IntervalSeconds != 0 && tickNum % itemFrequency == 0 {
2021-01-07 09:45:42 -06:00
readyToRun = append ( readyToRun , readyToRunItem { key : key , definitionInfo : definitionInfo } )
2020-12-17 08:00:09 -06:00
}
// remove the alert definition from the registered alert definitions
2021-01-07 09:45:42 -06:00
delete ( registeredDefinitions , key )
2020-12-17 08:00:09 -06:00
}
var step int64 = 0
if len ( readyToRun ) > 0 {
2021-03-03 09:52:19 -06:00
step = sch . baseInterval . Nanoseconds ( ) / int64 ( len ( readyToRun ) )
2020-12-17 08:00:09 -06:00
}
for i := range readyToRun {
item := readyToRun [ i ]
time . AfterFunc ( time . Duration ( int64 ( i ) * step ) , func ( ) {
2021-01-11 08:14:03 -06:00
item . definitionInfo . evalCh <- & evalContext { now : tick , version : item . definitionInfo . version }
2020-12-17 08:00:09 -06:00
} )
}
// unregister and stop routines of the deleted alert definitions
2021-01-07 09:45:42 -06:00
for key := range registeredDefinitions {
2021-03-03 09:52:19 -06:00
definitionInfo , err := sch . registry . get ( key )
2021-01-11 08:14:03 -06:00
if err != nil {
2021-03-03 09:52:19 -06:00
sch . log . Error ( "failed to get alert definition routine information" , "err" , err )
2021-01-11 08:14:03 -06:00
continue
}
definitionInfo . stopCh <- struct { } { }
2021-03-03 09:52:19 -06:00
sch . registry . del ( key )
2020-12-17 08:00:09 -06:00
}
case <- grafanaCtx . Done ( ) :
err := dispatcherGroup . Wait ( )
return err
}
}
}
type alertDefinitionRegistry struct {
mu sync . Mutex
2021-01-07 09:45:42 -06:00
alertDefinitionInfo map [ alertDefinitionKey ] alertDefinitionInfo
2020-12-17 08:00:09 -06:00
}
// getOrCreateInfo returns the channel for the specific alert definition
// if it does not exists creates one and returns it
2021-01-07 09:45:42 -06:00
func ( r * alertDefinitionRegistry ) getOrCreateInfo ( key alertDefinitionKey , definitionVersion int64 ) alertDefinitionInfo {
2020-12-17 08:00:09 -06:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-01-07 09:45:42 -06:00
info , ok := r . alertDefinitionInfo [ key ]
2020-12-17 08:00:09 -06:00
if ! ok {
2021-01-11 08:14:03 -06:00
r . alertDefinitionInfo [ key ] = alertDefinitionInfo { evalCh : make ( chan * evalContext ) , stopCh : make ( chan struct { } ) , version : definitionVersion }
2021-01-07 09:45:42 -06:00
return r . alertDefinitionInfo [ key ]
2020-12-17 08:00:09 -06:00
}
info . version = definitionVersion
2021-01-07 09:45:42 -06:00
r . alertDefinitionInfo [ key ] = info
2020-12-17 08:00:09 -06:00
return info
}
2021-01-11 08:14:03 -06:00
// get returns the channel for the specific alert definition
// if the key does not exist returns an error
func ( r * alertDefinitionRegistry ) get ( key alertDefinitionKey ) ( * alertDefinitionInfo , error ) {
r . mu . Lock ( )
defer r . mu . Unlock ( )
info , ok := r . alertDefinitionInfo [ key ]
if ! ok {
return nil , fmt . Errorf ( "%v key not found" , key )
}
return & info , nil
}
2021-01-07 09:45:42 -06:00
func ( r * alertDefinitionRegistry ) exists ( key alertDefinitionKey ) bool {
2020-12-17 08:00:09 -06:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-01-07 09:45:42 -06:00
_ , ok := r . alertDefinitionInfo [ key ]
2020-12-17 08:00:09 -06:00
return ok
}
2021-01-07 09:45:42 -06:00
func ( r * alertDefinitionRegistry ) del ( key alertDefinitionKey ) {
2020-12-17 08:00:09 -06:00
r . mu . Lock ( )
defer r . mu . Unlock ( )
2021-01-07 09:45:42 -06:00
delete ( r . alertDefinitionInfo , key )
2020-12-17 08:00:09 -06:00
}
2021-01-07 09:45:42 -06:00
func ( r * alertDefinitionRegistry ) iter ( ) <- chan alertDefinitionKey {
c := make ( chan alertDefinitionKey )
2020-12-17 08:00:09 -06:00
f := func ( ) {
r . mu . Lock ( )
defer r . mu . Unlock ( )
for k := range r . alertDefinitionInfo {
c <- k
}
close ( c )
}
go f ( )
return c
}
2021-01-07 09:45:42 -06:00
func ( r * alertDefinitionRegistry ) keyMap ( ) map [ alertDefinitionKey ] struct { } {
definitionsIDs := make ( map [ alertDefinitionKey ] struct { } )
for k := range r . iter ( ) {
definitionsIDs [ k ] = struct { } { }
2020-12-17 08:00:09 -06:00
}
return definitionsIDs
}
type alertDefinitionInfo struct {
2021-01-11 08:14:03 -06:00
evalCh chan * evalContext
stopCh chan struct { }
2020-12-17 08:00:09 -06:00
version int64
}
type evalContext struct {
now time . Time
version int64
}