2021-03-08 22:19:21 +02:00
package schedule
2020-12-17 16:00:09 +02:00
import (
"context"
"fmt"
2021-08-06 13:06:56 +01:00
"net/url"
2020-12-17 16:00:09 +02:00
"time"
2023-01-30 09:55:35 +01:00
"github.com/benbjohnson/clock"
"golang.org/x/sync/errgroup"
2023-01-27 03:46:21 -05:00
2020-12-17 16:00:09 +02:00
"github.com/grafana/grafana/pkg/infra/log"
2023-01-06 21:21:43 -05:00
"github.com/grafana/grafana/pkg/infra/tracing"
2022-01-11 11:39:34 -05:00
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
2020-12-17 16:00:09 +02:00
"github.com/grafana/grafana/pkg/services/ngalert/eval"
2021-08-06 13:06:56 +01:00
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
2022-06-17 13:10:49 -04:00
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
2021-04-19 12:28:44 +05:30
"github.com/grafana/grafana/pkg/services/ngalert/state"
2022-09-26 12:35:33 -05:00
"github.com/grafana/grafana/pkg/util/ticker"
2020-12-17 16:00:09 +02:00
)
2021-11-25 10:12:04 +00:00
// ScheduleService is an interface for a service that schedules the evaluation
// of alert rules.
2021-03-08 22:19:21 +02:00
type ScheduleService interface {
2021-11-25 10:12:04 +00:00
// Run the scheduler until the context is canceled or the scheduler returns
// an error. The scheduler is terminated when this function returns.
2021-07-27 11:52:59 +01:00
Run ( context . Context ) error
2021-03-03 17:52:19 +02:00
}
2023-12-06 20:45:08 +00:00
// retryDelay represents how long to wait between each failed rule evaluation.
const retryDelay = 1 * time . Second
2022-07-12 15:13:04 -04:00
// AlertsSender is an interface for a service that is responsible for sending notifications to the end-user.
2022-09-01 18:15:44 +02:00
//
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
2022-07-12 15:13:04 -04:00
type AlertsSender interface {
2023-10-19 11:27:37 +02:00
Send ( ctx context . Context , key ngmodels . AlertRuleKey , alerts definitions . PostableAlerts )
2022-07-12 15:13:04 -04:00
}
2022-08-31 11:08:19 -04:00
// RulesStore is a store that provides alert rules for scheduling
type RulesStore interface {
GetAlertRulesKeysForScheduling ( ctx context . Context ) ( [ ] ngmodels . AlertRuleKeyWithVersion , error )
GetAlertRulesForScheduling ( ctx context . Context , query * ngmodels . GetAlertRulesForSchedulingQuery ) error
}
2020-12-17 16:00:09 +02:00
type schedule struct {
// base tick rate (fastest possible configured check)
baseInterval time . Duration
2024-03-11 15:57:38 -05:00
// each rule gets its own channel and routine
registry ruleRegistry
2020-12-17 16:00:09 +02:00
maxAttempts int64
clock clock . Clock
// evalApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from evalApplied is handled.
2022-06-17 13:10:49 -04:00
evalAppliedFunc func ( ngmodels . AlertRuleKey , time . Time )
2020-12-17 16:00:09 +02:00
2021-01-11 16:14:03 +02:00
// stopApplied is only used for tests: test code can set it to non-nil
// function, and then it'll be called from the event loop whenever the
// message from stopApplied is handled.
2022-06-17 13:10:49 -04:00
stopAppliedFunc func ( ngmodels . AlertRuleKey )
2021-01-11 16:14:03 +02:00
2020-12-17 16:00:09 +02:00
log log . Logger
2021-01-22 19:27:33 +02:00
2022-11-02 10:13:39 -04:00
evaluatorFactory eval . EvaluatorFactory
2021-03-03 17:52:19 +02:00
2022-08-31 11:08:19 -04:00
ruleStore RulesStore
2021-03-30 09:37:56 -07:00
2021-07-07 17:18:31 +01:00
stateManager * state . Manager
2022-07-11 12:41:40 -04:00
appURL * url . URL
disableGrafanaFolder bool
2024-01-18 12:48:11 -06:00
jitterEvaluations JitterStrategy
2021-06-16 15:34:12 +05:30
2022-07-12 15:13:04 -04:00
metrics * metrics . Scheduler
2021-08-06 13:06:56 +01:00
2022-07-12 15:13:04 -04:00
alertsSender AlertsSender
minRuleInterval time . Duration
2022-06-07 16:20:06 +01:00
// schedulableAlertRules contains the alert rules that are considered for
// evaluation in the current tick. The evaluation of an alert rule in the
// current tick depends on its evaluation interval and when it was
// last evaluated.
2022-07-26 09:40:06 -04:00
schedulableAlertRules alertRulesRegistry
2023-01-06 21:21:43 -05:00
tracer tracing . Tracer
2021-01-22 19:27:33 +02:00
}
2021-03-08 22:19:21 +02:00
// SchedulerCfg is the scheduler configuration.
type SchedulerCfg struct {
2022-12-02 16:02:07 -06:00
MaxAttempts int64
BaseInterval time . Duration
C clock . Clock
MinRuleInterval time . Duration
DisableGrafanaFolder bool
AppURL * url . URL
2024-01-18 12:48:11 -06:00
JitterEvaluations JitterStrategy
2022-12-02 16:02:07 -06:00
EvaluatorFactory eval . EvaluatorFactory
RuleStore RulesStore
Metrics * metrics . Scheduler
AlertSender AlertsSender
2023-01-06 21:21:43 -05:00
Tracer tracing . Tracer
2023-09-20 15:07:02 +02:00
Log log . Logger
2020-12-17 16:00:09 +02:00
}
2024-03-06 16:08:45 -06:00
// NewScheduler returns a new scheduler.
2022-12-02 16:02:07 -06:00
func NewScheduler ( cfg SchedulerCfg , stateManager * state . Manager ) * schedule {
2024-01-09 13:19:37 -06:00
const minMaxAttempts = int64 ( 1 )
if cfg . MaxAttempts < minMaxAttempts {
cfg . Log . Warn ( "Invalid scheduler maxAttempts, using a safe minimum" , "configured" , cfg . MaxAttempts , "actual" , minMaxAttempts )
cfg . MaxAttempts = minMaxAttempts
}
2020-12-17 16:00:09 +02:00
sch := schedule {
2024-03-11 15:57:38 -05:00
registry : newRuleRegistry ( ) ,
2022-12-02 16:02:07 -06:00
maxAttempts : cfg . MaxAttempts ,
2022-07-12 15:13:04 -04:00
clock : cfg . C ,
2022-12-02 16:02:07 -06:00
baseInterval : cfg . BaseInterval ,
2023-09-20 15:07:02 +02:00
log : cfg . Log ,
2022-11-02 10:13:39 -04:00
evaluatorFactory : cfg . EvaluatorFactory ,
2022-07-12 15:13:04 -04:00
ruleStore : cfg . RuleStore ,
metrics : cfg . Metrics ,
2022-12-02 16:02:07 -06:00
appURL : cfg . AppURL ,
disableGrafanaFolder : cfg . DisableGrafanaFolder ,
2024-01-18 12:48:11 -06:00
jitterEvaluations : cfg . JitterEvaluations ,
2022-07-12 15:13:04 -04:00
stateManager : stateManager ,
2022-12-02 16:02:07 -06:00
minRuleInterval : cfg . MinRuleInterval ,
2022-07-26 09:40:06 -04:00
schedulableAlertRules : alertRulesRegistry { rules : make ( map [ ngmodels . AlertRuleKey ] * ngmodels . AlertRule ) } ,
2022-07-12 15:13:04 -04:00
alertsSender : cfg . AlertSender ,
2023-01-06 21:21:43 -05:00
tracer : cfg . Tracer ,
2020-12-17 16:00:09 +02:00
}
2022-06-17 13:10:49 -04:00
2020-12-17 16:00:09 +02:00
return & sch
}
2021-07-27 11:52:59 +01:00
func ( sch * schedule ) Run ( ctx context . Context ) error {
2024-01-09 13:19:37 -06:00
sch . log . Info ( "Starting scheduler" , "tickInterval" , sch . baseInterval , "maxAttempts" , sch . maxAttempts )
2022-10-05 09:35:02 -04:00
t := ticker . New ( sch . clock , sch . baseInterval , sch . metrics . Ticker )
defer t . Stop ( )
2022-06-01 11:48:10 -04:00
2022-10-05 09:35:02 -04:00
if err := sch . schedulePeriodic ( ctx , t ) ; err != nil {
2022-10-20 13:43:48 -04:00
sch . log . Error ( "Failure while running the rule evaluation loop" , "error" , err )
2021-08-06 13:06:56 +01:00
}
return nil
}
2024-02-07 09:31:22 -06:00
// Rules fetches the entire set of rules considered for evaluation by the scheduler on the next tick.
// Such rules are not guaranteed to have been evaluated by the scheduler.
// Rules returns all supplementary metadata for the rules that is stored by the scheduler - namely, the set of folder titles.
func ( sch * schedule ) Rules ( ) ( [ ] * ngmodels . AlertRule , map [ ngmodels . FolderKey ] string ) {
return sch . schedulableAlertRules . all ( )
}
2023-03-14 18:02:51 -04:00
// deleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
func ( sch * schedule ) deleteAlertRule ( keys ... ngmodels . AlertRuleKey ) {
2022-08-24 15:33:33 -04:00
for _ , key := range keys {
// It can happen that the scheduler has deleted the alert rule before the
// Ruler API has called DeleteAlertRule. This can happen as requests to
// the Ruler API do not hold an exclusive lock over all scheduler operations.
if _ , ok := sch . schedulableAlertRules . del ( key ) ; ! ok {
2022-10-20 13:43:48 -04:00
sch . log . Info ( "Alert rule cannot be removed from the scheduler as it is not scheduled" , key . LogContext ( ) ... )
2022-08-24 15:33:33 -04:00
}
// Delete the rule routine
2024-03-11 15:57:38 -05:00
ruleRoutine , ok := sch . registry . del ( key )
2022-08-24 15:33:33 -04:00
if ! ok {
2022-10-20 13:43:48 -04:00
sch . log . Info ( "Alert rule cannot be stopped as it is not running" , key . LogContext ( ) ... )
2022-08-24 15:33:33 -04:00
continue
}
// stop rule evaluation
2024-03-11 15:57:38 -05:00
ruleRoutine . Stop ( errRuleDeleted )
2022-06-07 16:20:06 +01:00
}
2022-06-08 18:37:33 +01:00
// Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick.
2022-08-31 11:08:19 -04:00
alertRules , _ := sch . schedulableAlertRules . all ( )
2023-02-09 17:05:19 +01:00
sch . updateRulesMetrics ( alertRules )
2022-01-11 11:39:34 -05:00
}
2022-10-05 09:35:02 -04:00
func ( sch * schedule ) schedulePeriodic ( ctx context . Context , t * ticker . T ) error {
2021-07-27 11:52:59 +01:00
dispatcherGroup , ctx := errgroup . WithContext ( ctx )
2020-12-17 16:00:09 +02:00
for {
select {
2022-10-05 09:35:02 -04:00
case tick := <- t . C :
2022-02-25 11:40:30 +00:00
// We use Round(0) on the start time to remove the monotonic clock.
2022-02-25 14:43:08 +00:00
// This is required as ticks from the ticker and time.Now() can have
// a monotonic clock that when subtracted do not represent the delta
// in wall clock time.
2022-02-25 11:40:30 +00:00
start := time . Now ( ) . Round ( 0 )
2022-01-31 16:56:43 +00:00
sch . metrics . BehindSeconds . Set ( start . Sub ( tick ) . Seconds ( ) )
2022-11-09 15:08:57 -05:00
sch . processTick ( ctx , dispatcherGroup , tick )
2022-01-31 16:56:43 +00:00
2022-11-09 15:08:57 -05:00
sch . metrics . SchedulePeriodicDuration . Observe ( time . Since ( start ) . Seconds ( ) )
case <- ctx . Done ( ) :
// waiting for all rule evaluation routines to stop
waitErr := dispatcherGroup . Wait ( )
return waitErr
}
}
}
2021-08-06 13:06:56 +01:00
2022-11-09 15:08:57 -05:00
type readyToRunItem struct {
2024-03-11 15:57:38 -05:00
ruleRoutine Rule
Evaluation
2022-11-09 15:08:57 -05:00
}
2021-09-28 13:00:16 +03:00
2023-03-14 18:02:51 -04:00
// TODO refactor to accept a callback for tests that will be called with things that are returned currently, and return nothing.
// Returns a slice of rules that were scheduled for evaluation, map of stopped rules, and a slice of updated rules
func ( sch * schedule ) processTick ( ctx context . Context , dispatcherGroup * errgroup . Group , tick time . Time ) ( [ ] readyToRunItem , map [ ngmodels . AlertRuleKey ] struct { } , [ ] ngmodels . AlertRuleKeyWithVersion ) {
2022-11-09 15:08:57 -05:00
tickNum := tick . Unix ( ) / int64 ( sch . baseInterval . Seconds ( ) )
2021-09-28 13:00:16 +03:00
2023-03-14 18:02:51 -04:00
// update the local registry. If there was a difference between the previous state and the current new state, rulesDiff will contains keys of rules that were updated.
rulesDiff , err := sch . updateSchedulableAlertRules ( ctx )
updated := rulesDiff . updated
if updated == nil { // make sure map is not nil
updated = map [ ngmodels . AlertRuleKey ] struct { } { }
}
if err != nil {
2022-11-09 15:08:57 -05:00
sch . log . Error ( "Failed to update alert rules" , "error" , err )
}
2023-03-14 18:02:51 -04:00
// this is the new current state. rulesDiff contains the previously existing rules that were different between this state and the previous state.
2022-11-09 15:08:57 -05:00
alertRules , folderTitles := sch . schedulableAlertRules . all ( )
2020-12-17 16:00:09 +02:00
2022-11-09 15:08:57 -05:00
// registeredDefinitions is a map used for finding deleted alert rules
// initially it is assigned to all known alert rules from the previous cycle
// each alert rule found also in this cycle is removed
// so, at the end, the remaining registered alert rules are the deleted ones
registeredDefinitions := sch . registry . keyMap ( )
2020-12-17 16:00:09 +02:00
2023-02-09 17:05:19 +01:00
sch . updateRulesMetrics ( alertRules )
2020-12-17 16:00:09 +02:00
2022-11-09 15:08:57 -05:00
readyToRun := make ( [ ] readyToRunItem , 0 )
2023-03-14 18:02:51 -04:00
updatedRules := make ( [ ] ngmodels . AlertRuleKeyWithVersion , 0 , len ( updated ) ) // this is needed for tests only
2022-11-09 15:08:57 -05:00
missingFolder := make ( map [ string ] [ ] string )
2024-03-06 13:44:53 -06:00
ruleFactory := newRuleFactory (
sch . appURL ,
sch . disableGrafanaFolder ,
sch . maxAttempts ,
sch . alertsSender ,
sch . stateManager ,
sch . evaluatorFactory ,
& sch . schedulableAlertRules ,
sch . clock ,
sch . metrics ,
sch . log ,
sch . tracer ,
sch . evalAppliedFunc ,
sch . stopAppliedFunc ,
)
2022-11-09 15:08:57 -05:00
for _ , item := range alertRules {
key := item . GetKey ( )
2024-03-11 15:57:38 -05:00
ruleRoutine , newRoutine := sch . registry . getOrCreate ( ctx , key , ruleFactory )
2020-12-17 16:00:09 +02:00
2022-11-09 15:08:57 -05:00
// enforce minimum evaluation interval
if item . IntervalSeconds < int64 ( sch . minRuleInterval . Seconds ( ) ) {
sch . log . Debug ( "Interval adjusted" , append ( key . LogContext ( ) , "originalInterval" , item . IntervalSeconds , "adjustedInterval" , sch . minRuleInterval . Seconds ( ) ) ... )
item . IntervalSeconds = int64 ( sch . minRuleInterval . Seconds ( ) )
}
2020-12-17 16:00:09 +02:00
2022-11-09 15:08:57 -05:00
invalidInterval := item . IntervalSeconds % int64 ( sch . baseInterval . Seconds ( ) ) != 0
2022-08-31 11:08:19 -04:00
2022-11-09 15:08:57 -05:00
if newRoutine && ! invalidInterval {
dispatcherGroup . Go ( func ( ) error {
2024-03-11 15:57:38 -05:00
return ruleRoutine . Run ( key )
2022-11-09 15:08:57 -05:00
} )
}
if invalidInterval {
// this is expected to be always false
// given that we validate interval during alert rule updates
sch . log . Warn ( "Rule has an invalid interval and will be ignored. Interval should be divided exactly by scheduler interval" , append ( key . LogContext ( ) , "ruleInterval" , time . Duration ( item . IntervalSeconds ) * time . Second , "schedulerInterval" , sch . baseInterval ) ... )
continue
}
2020-12-17 16:00:09 +02:00
2022-11-09 15:08:57 -05:00
itemFrequency := item . IntervalSeconds / int64 ( sch . baseInterval . Seconds ( ) )
2024-01-18 12:48:11 -06:00
offset := jitterOffsetInTicks ( item , sch . baseInterval , sch . jitterEvaluations )
isReadyToRun := item . IntervalSeconds != 0 && ( tickNum % itemFrequency ) - offset == 0
2023-04-28 10:42:16 -04:00
var folderTitle string
if ! sch . disableGrafanaFolder {
2024-01-30 17:14:11 -05:00
title , ok := folderTitles [ item . GetFolderKey ( ) ]
2023-04-28 10:42:16 -04:00
if ok {
folderTitle = title
} else {
missingFolder [ item . NamespaceUID ] = append ( missingFolder [ item . NamespaceUID ] , item . UID )
2020-12-17 16:00:09 +02:00
}
2023-04-28 10:42:16 -04:00
}
if isReadyToRun {
2024-01-18 12:48:11 -06:00
sch . log . Debug ( "Rule is ready to run on the current tick" , "uid" , item . UID , "tick" , tickNum , "frequency" , itemFrequency , "offset" , offset )
2024-03-11 15:57:38 -05:00
readyToRun = append ( readyToRun , readyToRunItem { ruleRoutine : ruleRoutine , Evaluation : Evaluation {
2022-11-09 15:08:57 -05:00
scheduledAt : tick ,
rule : item ,
folderTitle : folderTitle ,
} } )
}
2023-03-14 18:02:51 -04:00
if _ , isUpdated := updated [ key ] ; isUpdated && ! isReadyToRun {
// if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that
sch . log . Debug ( "Rule has been updated. Notifying evaluation routine" , key . LogContext ( ) ... )
2024-03-11 15:57:38 -05:00
go func ( routine Rule , rule * ngmodels . AlertRule ) {
routine . Update ( RuleVersionAndPauseStatus {
2023-04-28 10:42:16 -04:00
Fingerprint : ruleWithFolder { rule : rule , folderTitle : folderTitle } . Fingerprint ( ) ,
IsPaused : rule . IsPaused ,
2023-03-14 18:02:51 -04:00
} )
2024-03-11 15:57:38 -05:00
} ( ruleRoutine , item )
2023-03-14 18:02:51 -04:00
updatedRules = append ( updatedRules , ngmodels . AlertRuleKeyWithVersion {
Version : item . Version ,
AlertRuleKey : item . GetKey ( ) ,
} )
}
2022-11-09 15:08:57 -05:00
// remove the alert rule from the registered alert rules
delete ( registeredDefinitions , key )
}
if len ( missingFolder ) > 0 { // if this happens then there can be problems with fetching folders from the database.
sch . log . Warn ( "Unable to obtain folder titles for some rules" , "missingFolderUIDToRuleUID" , missingFolder )
}
var step int64 = 0
if len ( readyToRun ) > 0 {
step = sch . baseInterval . Nanoseconds ( ) / int64 ( len ( readyToRun ) )
}
for i := range readyToRun {
item := readyToRun [ i ]
2020-12-17 16:00:09 +02:00
2022-11-09 15:08:57 -05:00
time . AfterFunc ( time . Duration ( int64 ( i ) * step ) , func ( ) {
key := item . rule . GetKey ( )
2024-03-11 15:57:38 -05:00
success , dropped := item . ruleRoutine . Eval ( & item . Evaluation )
2022-11-09 15:08:57 -05:00
if ! success {
sch . log . Debug ( "Scheduled evaluation was canceled because evaluation routine was stopped" , append ( key . LogContext ( ) , "time" , tick ) ... )
return
}
if dropped != nil {
sch . log . Warn ( "Tick dropped because alert rule evaluation is too slow" , append ( key . LogContext ( ) , "time" , tick ) ... )
orgID := fmt . Sprint ( key . OrgID )
sch . metrics . EvaluationMissed . WithLabelValues ( orgID , item . rule . Title ) . Inc ( )
2020-12-17 16:00:09 +02:00
}
2022-11-09 15:08:57 -05:00
} )
}
2022-01-31 16:56:43 +00:00
2022-11-09 15:08:57 -05:00
// unregister and stop routines of the deleted alert rules
2023-01-09 14:39:32 -05:00
toDelete := make ( [ ] ngmodels . AlertRuleKey , 0 , len ( registeredDefinitions ) )
2022-11-09 15:08:57 -05:00
for key := range registeredDefinitions {
2023-01-09 14:39:32 -05:00
toDelete = append ( toDelete , key )
2020-12-17 16:00:09 +02:00
}
2023-03-14 18:02:51 -04:00
sch . deleteAlertRule ( toDelete ... )
return readyToRun , registeredDefinitions , updatedRules
2020-12-17 16:00:09 +02:00
}