2024-03-04 11:24:49 -06:00
package schedule
import (
2024-03-04 17:15:55 -06:00
"bytes"
2024-03-04 11:24:49 -06:00
context "context"
2024-03-04 17:15:55 -06:00
"fmt"
2024-03-04 11:24:49 -06:00
"math"
"math/rand"
"runtime"
"sync"
"testing"
"time"
2024-03-04 17:15:55 -06:00
alertingModels "github.com/grafana/alerting/models"
"github.com/grafana/grafana-plugin-sdk-go/data"
definitions "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
2024-03-04 11:24:49 -06:00
models "github.com/grafana/grafana/pkg/services/ngalert/models"
2024-03-04 17:15:55 -06:00
"github.com/grafana/grafana/pkg/services/ngalert/state"
2024-03-04 11:24:49 -06:00
"github.com/grafana/grafana/pkg/util"
2024-03-04 17:15:55 -06:00
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
prometheusModel "github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
mock "github.com/stretchr/testify/mock"
2024-03-04 11:24:49 -06:00
"github.com/stretchr/testify/require"
)
2024-03-11 15:57:38 -05:00
func TestAlertRule ( t * testing . T ) {
2024-03-04 11:24:49 -06:00
type evalResponse struct {
success bool
2024-03-11 15:57:38 -05:00
droppedEval * Evaluation
2024-03-04 11:24:49 -06:00
}
t . Run ( "when rule evaluation is not stopped" , func ( t * testing . T ) {
t . Run ( "update should send to updateCh" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
2024-03-04 11:24:49 -06:00
resultCh := make ( chan bool )
go func ( ) {
2024-03-11 15:57:38 -05:00
resultCh <- r . Update ( RuleVersionAndPauseStatus { fingerprint ( rand . Uint64 ( ) ) , false } )
2024-03-04 11:24:49 -06:00
} ( )
select {
case <- r . updateCh :
require . True ( t , <- resultCh )
case <- time . After ( 5 * time . Second ) :
t . Fatal ( "No message was received on update channel" )
}
} )
t . Run ( "update should drop any concurrent sending to updateCh" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
version1 := RuleVersionAndPauseStatus { fingerprint ( rand . Uint64 ( ) ) , false }
version2 := RuleVersionAndPauseStatus { fingerprint ( rand . Uint64 ( ) ) , false }
2024-03-04 11:24:49 -06:00
wg := sync . WaitGroup { }
wg . Add ( 1 )
go func ( ) {
wg . Done ( )
2024-03-11 15:57:38 -05:00
r . Update ( version1 )
2024-03-04 11:24:49 -06:00
wg . Done ( )
} ( )
wg . Wait ( )
wg . Add ( 2 ) // one when time1 is sent, another when go-routine for time2 has started
go func ( ) {
wg . Done ( )
2024-03-11 15:57:38 -05:00
r . Update ( version2 )
2024-03-04 11:24:49 -06:00
} ( )
wg . Wait ( ) // at this point tick 1 has already been dropped
select {
case version := <- r . updateCh :
require . Equal ( t , version2 , version )
case <- time . After ( 5 * time . Second ) :
t . Fatal ( "No message was received on eval channel" )
}
} )
t . Run ( "eval should send to evalCh" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
2024-03-04 11:24:49 -06:00
expected := time . Now ( )
resultCh := make ( chan evalResponse )
2024-03-11 15:57:38 -05:00
data := & Evaluation {
2024-03-04 11:24:49 -06:00
scheduledAt : expected ,
rule : models . AlertRuleGen ( ) ( ) ,
folderTitle : util . GenerateShortUID ( ) ,
}
go func ( ) {
2024-03-11 15:57:38 -05:00
result , dropped := r . Eval ( data )
2024-03-04 11:24:49 -06:00
resultCh <- evalResponse { result , dropped }
} ( )
select {
case ctx := <- r . evalCh :
require . Equal ( t , data , ctx )
result := <- resultCh
require . True ( t , result . success )
require . Nilf ( t , result . droppedEval , "expected no dropped evaluations but got one" )
case <- time . After ( 5 * time . Second ) :
t . Fatal ( "No message was received on eval channel" )
}
} )
t . Run ( "eval should drop any concurrent sending to evalCh" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
2024-03-04 11:24:49 -06:00
time1 := time . UnixMilli ( rand . Int63n ( math . MaxInt64 ) )
time2 := time . UnixMilli ( rand . Int63n ( math . MaxInt64 ) )
resultCh1 := make ( chan evalResponse )
resultCh2 := make ( chan evalResponse )
2024-03-11 15:57:38 -05:00
data := & Evaluation {
2024-03-04 11:24:49 -06:00
scheduledAt : time1 ,
rule : models . AlertRuleGen ( ) ( ) ,
folderTitle : util . GenerateShortUID ( ) ,
}
2024-03-11 15:57:38 -05:00
data2 := & Evaluation {
2024-03-04 11:24:49 -06:00
scheduledAt : time2 ,
rule : data . rule ,
folderTitle : data . folderTitle ,
}
wg := sync . WaitGroup { }
wg . Add ( 1 )
go func ( ) {
wg . Done ( )
2024-03-11 15:57:38 -05:00
result , dropped := r . Eval ( data )
2024-03-04 11:24:49 -06:00
wg . Done ( )
resultCh1 <- evalResponse { result , dropped }
} ( )
wg . Wait ( )
wg . Add ( 2 ) // one when time1 is sent, another when go-routine for time2 has started
go func ( ) {
wg . Done ( )
2024-03-11 15:57:38 -05:00
result , dropped := r . Eval ( data2 )
2024-03-04 11:24:49 -06:00
resultCh2 <- evalResponse { result , dropped }
} ( )
wg . Wait ( ) // at this point tick 1 has already been dropped
select {
case ctx := <- r . evalCh :
require . Equal ( t , time2 , ctx . scheduledAt )
result := <- resultCh1
require . True ( t , result . success )
require . Nilf ( t , result . droppedEval , "expected no dropped evaluations but got one" )
result = <- resultCh2
require . True ( t , result . success )
require . NotNil ( t , result . droppedEval , "expected no dropped evaluations but got one" )
require . Equal ( t , time1 , result . droppedEval . scheduledAt )
case <- time . After ( 5 * time . Second ) :
t . Fatal ( "No message was received on eval channel" )
}
} )
t . Run ( "eval should exit when context is cancelled" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
2024-03-04 11:24:49 -06:00
resultCh := make ( chan evalResponse )
2024-03-11 15:57:38 -05:00
data := & Evaluation {
2024-03-04 11:24:49 -06:00
scheduledAt : time . Now ( ) ,
rule : models . AlertRuleGen ( ) ( ) ,
folderTitle : util . GenerateShortUID ( ) ,
}
go func ( ) {
2024-03-11 15:57:38 -05:00
result , dropped := r . Eval ( data )
2024-03-04 11:24:49 -06:00
resultCh <- evalResponse { result , dropped }
} ( )
runtime . Gosched ( )
2024-03-11 15:57:38 -05:00
r . Stop ( nil )
2024-03-04 11:24:49 -06:00
select {
case result := <- resultCh :
require . False ( t , result . success )
require . Nilf ( t , result . droppedEval , "expected no dropped evaluations but got one" )
case <- time . After ( 5 * time . Second ) :
t . Fatal ( "No message was received on eval channel" )
}
} )
} )
t . Run ( "when rule evaluation is stopped" , func ( t * testing . T ) {
t . Run ( "Update should do nothing" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
r . Stop ( errRuleDeleted )
2024-03-04 11:24:49 -06:00
require . ErrorIs ( t , r . ctx . Err ( ) , errRuleDeleted )
2024-03-11 15:57:38 -05:00
require . False ( t , r . Update ( RuleVersionAndPauseStatus { fingerprint ( rand . Uint64 ( ) ) , false } ) )
2024-03-04 11:24:49 -06:00
} )
t . Run ( "eval should do nothing" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
r . Stop ( nil )
data := & Evaluation {
2024-03-04 11:24:49 -06:00
scheduledAt : time . Now ( ) ,
rule : models . AlertRuleGen ( ) ( ) ,
folderTitle : util . GenerateShortUID ( ) ,
}
2024-03-11 15:57:38 -05:00
success , dropped := r . Eval ( data )
2024-03-04 11:24:49 -06:00
require . False ( t , success )
require . Nilf ( t , dropped , "expected no dropped evaluations but got one" )
} )
t . Run ( "stop should do nothing" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
r . Stop ( nil )
r . Stop ( nil )
2024-03-04 11:24:49 -06:00
} )
t . Run ( "stop should do nothing if parent context stopped" , func ( t * testing . T ) {
ctx , cancelFn := context . WithCancel ( context . Background ( ) )
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( ctx )
2024-03-04 11:24:49 -06:00
cancelFn ( )
2024-03-11 15:57:38 -05:00
r . Stop ( nil )
2024-03-04 11:24:49 -06:00
} )
} )
t . Run ( "should be thread-safe" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
r := blankRuleForTests ( context . Background ( ) )
2024-03-04 11:24:49 -06:00
wg := sync . WaitGroup { }
go func ( ) {
for {
select {
case <- r . evalCh :
time . Sleep ( time . Microsecond )
case <- r . updateCh :
time . Sleep ( time . Microsecond )
case <- r . ctx . Done ( ) :
return
}
}
} ( )
for i := 0 ; i < 10 ; i ++ {
wg . Add ( 1 )
go func ( ) {
for i := 0 ; i < 20 ; i ++ {
max := 3
if i <= 10 {
max = 2
}
switch rand . Intn ( max ) + 1 {
case 1 :
2024-03-11 15:57:38 -05:00
r . Update ( RuleVersionAndPauseStatus { fingerprint ( rand . Uint64 ( ) ) , false } )
2024-03-04 11:24:49 -06:00
case 2 :
2024-03-11 15:57:38 -05:00
r . Eval ( & Evaluation {
2024-03-04 11:24:49 -06:00
scheduledAt : time . Now ( ) ,
rule : models . AlertRuleGen ( ) ( ) ,
folderTitle : util . GenerateShortUID ( ) ,
} )
case 3 :
2024-03-11 15:57:38 -05:00
r . Stop ( nil )
2024-03-04 11:24:49 -06:00
}
}
wg . Done ( )
} ( )
}
wg . Wait ( )
} )
}
2024-03-04 17:15:55 -06:00
2024-03-11 15:57:38 -05:00
func blankRuleForTests ( ctx context . Context ) * alertRule {
return newAlertRule ( context . Background ( ) , nil , false , 0 , nil , nil , nil , nil , nil , nil , nil , nil , nil , nil )
2024-03-06 13:44:53 -06:00
}
2024-03-04 17:15:55 -06:00
func TestRuleRoutine ( t * testing . T ) {
createSchedule := func (
evalAppliedChan chan time . Time ,
senderMock * SyncAlertsSenderMock ,
) ( * schedule , * fakeRulesStore , * state . FakeInstanceStore , prometheus . Gatherer ) {
ruleStore := newFakeRulesStore ( )
instanceStore := & state . FakeInstanceStore { }
registry := prometheus . NewPedanticRegistry ( )
sch := setupScheduler ( t , ruleStore , instanceStore , registry , senderMock , nil )
sch . evalAppliedFunc = func ( key models . AlertRuleKey , t time . Time ) {
evalAppliedChan <- t
}
return sch , ruleStore , instanceStore , registry
}
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
normalStates := [ ] eval . State { eval . Normal , eval . Alerting , eval . Pending }
allStates := [ ... ] eval . State { eval . Normal , eval . Alerting , eval . Pending , eval . NoData , eval . Error }
for _ , evalState := range normalStates {
// TODO rewrite when we are able to mock/fake state manager
t . Run ( fmt . Sprintf ( "when rule evaluation happens (evaluation state %s)" , evalState ) , func ( t * testing . T ) {
evalAppliedChan := make ( chan time . Time )
sch , ruleStore , instanceStore , reg := createSchedule ( evalAppliedChan , nil )
rule := models . AlertRuleGen ( withQueryForState ( t , evalState ) ) ( )
ruleStore . PutRule ( context . Background ( ) , rule )
folderTitle := ruleStore . getNamespaceTitle ( rule . NamespaceUID )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
2024-03-04 17:15:55 -06:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
t . Cleanup ( cancel )
2024-03-06 13:44:53 -06:00
ruleInfo := factory . new ( ctx )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
_ = ruleInfo . Run ( rule . GetKey ( ) )
2024-03-04 17:15:55 -06:00
} ( )
expectedTime := time . UnixMicro ( rand . Int63 ( ) )
2024-03-11 15:57:38 -05:00
ruleInfo . Eval ( & Evaluation {
2024-03-04 17:15:55 -06:00
scheduledAt : expectedTime ,
rule : rule ,
folderTitle : folderTitle ,
2024-03-11 15:57:38 -05:00
} )
2024-03-04 17:15:55 -06:00
actualTime := waitForTimeChannel ( t , evalAppliedChan )
require . Equal ( t , expectedTime , actualTime )
t . Run ( "it should add extra labels" , func ( t * testing . T ) {
states := sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID )
for _ , s := range states {
assert . Equal ( t , rule . UID , s . Labels [ alertingModels . RuleUIDLabel ] )
assert . Equal ( t , rule . NamespaceUID , s . Labels [ alertingModels . NamespaceUIDLabel ] )
assert . Equal ( t , rule . Title , s . Labels [ prometheusModel . AlertNameLabel ] )
assert . Equal ( t , folderTitle , s . Labels [ models . FolderTitleLabel ] )
}
} )
t . Run ( "it should process evaluation results via state manager" , func ( t * testing . T ) {
// TODO rewrite when we are able to mock/fake state manager
states := sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID )
require . Len ( t , states , 1 )
s := states [ 0 ]
require . Equal ( t , rule . UID , s . AlertRuleUID )
require . Len ( t , s . Results , 1 )
var expectedStatus = evalState
if evalState == eval . Pending {
expectedStatus = eval . Alerting
}
require . Equal ( t , expectedStatus . String ( ) , s . Results [ 0 ] . EvaluationState . String ( ) )
require . Equal ( t , expectedTime , s . Results [ 0 ] . EvaluationTime )
} )
t . Run ( "it should save alert instances to storage" , func ( t * testing . T ) {
// TODO rewrite when we are able to mock/fake state manager
states := sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID )
require . Len ( t , states , 1 )
s := states [ 0 ]
var cmd * models . AlertInstance
for _ , op := range instanceStore . RecordedOps ( ) {
switch q := op . ( type ) {
case models . AlertInstance :
cmd = & q
}
if cmd != nil {
break
}
}
require . NotNil ( t , cmd )
t . Logf ( "Saved alert instances: %v" , cmd )
require . Equal ( t , rule . OrgID , cmd . RuleOrgID )
require . Equal ( t , expectedTime , cmd . LastEvalTime )
require . Equal ( t , rule . UID , cmd . RuleUID )
require . Equal ( t , evalState . String ( ) , string ( cmd . CurrentState ) )
require . Equal ( t , s . Labels , data . Labels ( cmd . Labels ) )
} )
t . Run ( "it reports metrics" , func ( t * testing . T ) {
// duration metric has 0 values because of mocked clock that do not advance
expectedMetric := fmt . Sprintf (
` # HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule .
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.01" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.1" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.5" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "1" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "5" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "10" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "15" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "30" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "60" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "120" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "180" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "240" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "300" } 1
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "+Inf" } 1
grafana_alerting_rule_evaluation_duration_seconds_sum { org = "%[1]d" } 0
grafana_alerting_rule_evaluation_duration_seconds_count { org = "%[1]d" } 1
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures .
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total { org = "%[1]d" } 0
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations .
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total { org = "%[1]d" } 1
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule .
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.01" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.1" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.5" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "1" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "5" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "10" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "15" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "30" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "60" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "120" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "180" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "240" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "300" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "+Inf" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_sum { org = "%[1]d" } 0
grafana_alerting_rule_process_evaluation_duration_seconds_count { org = "%[1]d" } 1
# HELP grafana_alerting_rule_send_alerts_duration_seconds The time to send the alerts to Alertmanager .
# TYPE grafana_alerting_rule_send_alerts_duration_seconds histogram
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "0.01" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "0.1" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "0.5" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "1" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "5" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "10" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "15" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "30" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "60" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "120" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "180" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "240" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "300" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "+Inf" } 1
grafana_alerting_rule_send_alerts_duration_seconds_sum { org = "%[1]d" } 0
grafana_alerting_rule_send_alerts_duration_seconds_count { org = "%[1]d" } 1
` , rule . OrgID )
err := testutil . GatherAndCompare ( reg , bytes . NewBufferString ( expectedMetric ) , "grafana_alerting_rule_evaluation_duration_seconds" , "grafana_alerting_rule_evaluations_total" , "grafana_alerting_rule_evaluation_failures_total" , "grafana_alerting_rule_process_evaluation_duration_seconds" , "grafana_alerting_rule_send_alerts_duration_seconds" )
require . NoError ( t , err )
} )
} )
}
t . Run ( "should exit" , func ( t * testing . T ) {
t . Run ( "and not clear the state if parent context is cancelled" , func ( t * testing . T ) {
stoppedChan := make ( chan error )
sch , _ , _ , _ := createSchedule ( make ( chan time . Time ) , nil )
rule := models . AlertRuleGen ( ) ( )
_ = sch . stateManager . ProcessEvalResults ( context . Background ( ) , sch . clock . Now ( ) , rule , eval . GenerateResults ( rand . Intn ( 5 ) + 1 , eval . ResultGen ( eval . WithEvaluatedAt ( sch . clock . Now ( ) ) ) ) , nil )
expectedStates := sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID )
require . NotEmpty ( t , expectedStates )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
2024-03-04 17:15:55 -06:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
2024-03-06 13:44:53 -06:00
ruleInfo := factory . new ( ctx )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
err := ruleInfo . Run ( models . AlertRuleKey { } )
2024-03-04 17:15:55 -06:00
stoppedChan <- err
} ( )
cancel ( )
err := waitForErrChannel ( t , stoppedChan )
require . NoError ( t , err )
require . Equal ( t , len ( expectedStates ) , len ( sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID ) ) )
} )
t . Run ( "and clean up the state if delete is cancellation reason for inner context" , func ( t * testing . T ) {
stoppedChan := make ( chan error )
sch , _ , _ , _ := createSchedule ( make ( chan time . Time ) , nil )
rule := models . AlertRuleGen ( ) ( )
_ = sch . stateManager . ProcessEvalResults ( context . Background ( ) , sch . clock . Now ( ) , rule , eval . GenerateResults ( rand . Intn ( 5 ) + 1 , eval . ResultGen ( eval . WithEvaluatedAt ( sch . clock . Now ( ) ) ) ) , nil )
require . NotEmpty ( t , sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID ) )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
ruleInfo := factory . new ( context . Background ( ) )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
err := ruleInfo . Run ( rule . GetKey ( ) )
2024-03-04 17:15:55 -06:00
stoppedChan <- err
} ( )
2024-03-11 15:57:38 -05:00
ruleInfo . Stop ( errRuleDeleted )
2024-03-04 17:15:55 -06:00
err := waitForErrChannel ( t , stoppedChan )
require . NoError ( t , err )
require . Empty ( t , sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID ) )
} )
} )
t . Run ( "when a message is sent to update channel" , func ( t * testing . T ) {
rule := models . AlertRuleGen ( withQueryForState ( t , eval . Normal ) ) ( )
folderTitle := "folderName"
ruleFp := ruleWithFolder { rule , folderTitle } . Fingerprint ( )
evalAppliedChan := make ( chan time . Time )
sender := NewSyncAlertsSenderMock ( )
sender . EXPECT ( ) . Send ( mock . Anything , rule . GetKey ( ) , mock . Anything ) . Return ( )
sch , ruleStore , _ , _ := createSchedule ( evalAppliedChan , sender )
ruleStore . PutRule ( context . Background ( ) , rule )
sch . schedulableAlertRules . set ( [ ] * models . AlertRule { rule } , map [ models . FolderKey ] string { rule . GetFolderKey ( ) : folderTitle } )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
2024-03-04 17:15:55 -06:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
t . Cleanup ( cancel )
2024-03-06 13:44:53 -06:00
ruleInfo := factory . new ( ctx )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
_ = ruleInfo . Run ( rule . GetKey ( ) )
2024-03-04 17:15:55 -06:00
} ( )
// init evaluation loop so it got the rule version
2024-03-11 15:57:38 -05:00
ruleInfo . Eval ( & Evaluation {
2024-03-04 17:15:55 -06:00
scheduledAt : sch . clock . Now ( ) ,
rule : rule ,
folderTitle : folderTitle ,
2024-03-11 15:57:38 -05:00
} )
2024-03-04 17:15:55 -06:00
waitForTimeChannel ( t , evalAppliedChan )
// define some state
states := make ( [ ] * state . State , 0 , len ( allStates ) )
for _ , s := range allStates {
for i := 0 ; i < 2 ; i ++ {
states = append ( states , & state . State {
AlertRuleUID : rule . UID ,
CacheID : util . GenerateShortUID ( ) ,
OrgID : rule . OrgID ,
State : s ,
StartsAt : sch . clock . Now ( ) ,
EndsAt : sch . clock . Now ( ) . Add ( time . Duration ( rand . Intn ( 25 ) + 5 ) * time . Second ) ,
Labels : rule . Labels ,
} )
}
}
sch . stateManager . Put ( states )
states = sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID )
expectedToBeSent := 0
for _ , s := range states {
if s . State == eval . Normal || s . State == eval . Pending {
continue
}
expectedToBeSent ++
}
require . Greaterf ( t , expectedToBeSent , 0 , "State manager was expected to return at least one state that can be expired" )
t . Run ( "should do nothing if version in channel is the same" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
ruleInfo . Update ( RuleVersionAndPauseStatus { ruleFp , false } )
ruleInfo . Update ( RuleVersionAndPauseStatus { ruleFp , false } ) // second time just to make sure that previous messages were handled
2024-03-04 17:15:55 -06:00
actualStates := sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID )
require . Len ( t , actualStates , len ( states ) )
sender . AssertNotCalled ( t , "Send" , mock . Anything , mock . Anything )
} )
t . Run ( "should clear the state and expire firing alerts if version in channel is greater" , func ( t * testing . T ) {
2024-03-11 15:57:38 -05:00
ruleInfo . Update ( RuleVersionAndPauseStatus { ruleFp + 1 , false } )
2024-03-04 17:15:55 -06:00
require . Eventually ( t , func ( ) bool {
return len ( sender . Calls ( ) ) > 0
} , 5 * time . Second , 100 * time . Millisecond )
require . Empty ( t , sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID ) )
sender . AssertNumberOfCalls ( t , "Send" , 1 )
args , ok := sender . Calls ( ) [ 0 ] . Arguments [ 2 ] . ( definitions . PostableAlerts )
require . Truef ( t , ok , fmt . Sprintf ( "expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T" , sender . Calls ( ) [ 0 ] . Arguments [ 2 ] ) )
require . Len ( t , args . PostableAlerts , expectedToBeSent )
} )
} )
t . Run ( "when evaluation fails" , func ( t * testing . T ) {
rule := models . AlertRuleGen ( withQueryForState ( t , eval . Error ) ) ( )
rule . ExecErrState = models . ErrorErrState
evalAppliedChan := make ( chan time . Time )
sender := NewSyncAlertsSenderMock ( )
sender . EXPECT ( ) . Send ( mock . Anything , rule . GetKey ( ) , mock . Anything ) . Return ( )
sch , ruleStore , _ , reg := createSchedule ( evalAppliedChan , sender )
sch . maxAttempts = 3
ruleStore . PutRule ( context . Background ( ) , rule )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
2024-03-04 17:15:55 -06:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
t . Cleanup ( cancel )
2024-03-06 13:44:53 -06:00
ruleInfo := factory . new ( ctx )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
_ = ruleInfo . Run ( rule . GetKey ( ) )
2024-03-04 17:15:55 -06:00
} ( )
2024-03-11 15:57:38 -05:00
ruleInfo . Eval ( & Evaluation {
2024-03-04 17:15:55 -06:00
scheduledAt : sch . clock . Now ( ) ,
rule : rule ,
2024-03-11 15:57:38 -05:00
} )
2024-03-04 17:15:55 -06:00
waitForTimeChannel ( t , evalAppliedChan )
t . Run ( "it should increase failure counter" , func ( t * testing . T ) {
// duration metric has 0 values because of mocked clock that do not advance
expectedMetric := fmt . Sprintf (
` # HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule .
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.01" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.1" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.5" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "1" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "5" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "10" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "15" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "30" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "60" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "120" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "180" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "240" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "300" } 3
grafana_alerting_rule_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "+Inf" } 3
grafana_alerting_rule_evaluation_duration_seconds_sum { org = "%[1]d" } 0
grafana_alerting_rule_evaluation_duration_seconds_count { org = "%[1]d" } 3
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures .
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total { org = "%[1]d" } 3
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations .
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total { org = "%[1]d" } 3
# HELP grafana_alerting_rule_process_evaluation_duration_seconds The time to process the evaluation results for a rule .
# TYPE grafana_alerting_rule_process_evaluation_duration_seconds histogram
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.01" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.1" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "0.5" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "1" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "5" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "10" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "15" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "30" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "60" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "120" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "180" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "240" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "300" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_bucket { org = "%[1]d" , le = "+Inf" } 1
grafana_alerting_rule_process_evaluation_duration_seconds_sum { org = "%[1]d" } 0
grafana_alerting_rule_process_evaluation_duration_seconds_count { org = "%[1]d" } 1
# HELP grafana_alerting_rule_send_alerts_duration_seconds The time to send the alerts to Alertmanager .
# TYPE grafana_alerting_rule_send_alerts_duration_seconds histogram
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "0.01" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "0.1" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "0.5" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "1" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "5" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "10" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "15" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "30" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "60" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "120" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "180" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "240" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "300" } 1
grafana_alerting_rule_send_alerts_duration_seconds_bucket { org = "%[1]d" , le = "+Inf" } 1
grafana_alerting_rule_send_alerts_duration_seconds_sum { org = "%[1]d" } 0
grafana_alerting_rule_send_alerts_duration_seconds_count { org = "%[1]d" } 1
` , rule . OrgID )
err := testutil . GatherAndCompare ( reg , bytes . NewBufferString ( expectedMetric ) , "grafana_alerting_rule_evaluation_duration_seconds" , "grafana_alerting_rule_evaluations_total" , "grafana_alerting_rule_evaluation_failures_total" , "grafana_alerting_rule_process_evaluation_duration_seconds" , "grafana_alerting_rule_send_alerts_duration_seconds" )
require . NoError ( t , err )
} )
t . Run ( "it should send special alert DatasourceError" , func ( t * testing . T ) {
sender . AssertNumberOfCalls ( t , "Send" , 1 )
args , ok := sender . Calls ( ) [ 0 ] . Arguments [ 2 ] . ( definitions . PostableAlerts )
require . Truef ( t , ok , fmt . Sprintf ( "expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T" , sender . Calls ( ) [ 0 ] . Arguments [ 2 ] ) )
assert . Len ( t , args . PostableAlerts , 1 )
assert . Equal ( t , state . ErrorAlertName , args . PostableAlerts [ 0 ] . Labels [ prometheusModel . AlertNameLabel ] )
} )
} )
t . Run ( "when there are alerts that should be firing" , func ( t * testing . T ) {
t . Run ( "it should call sender" , func ( t * testing . T ) {
// eval.Alerting makes state manager to create notifications for alertmanagers
rule := models . AlertRuleGen ( withQueryForState ( t , eval . Alerting ) ) ( )
evalAppliedChan := make ( chan time . Time )
sender := NewSyncAlertsSenderMock ( )
sender . EXPECT ( ) . Send ( mock . Anything , rule . GetKey ( ) , mock . Anything ) . Return ( )
sch , ruleStore , _ , _ := createSchedule ( evalAppliedChan , sender )
ruleStore . PutRule ( context . Background ( ) , rule )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
2024-03-04 17:15:55 -06:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
t . Cleanup ( cancel )
2024-03-06 13:44:53 -06:00
ruleInfo := factory . new ( ctx )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
_ = ruleInfo . Run ( rule . GetKey ( ) )
2024-03-04 17:15:55 -06:00
} ( )
2024-03-11 15:57:38 -05:00
ruleInfo . Eval ( & Evaluation {
2024-03-04 17:15:55 -06:00
scheduledAt : sch . clock . Now ( ) ,
rule : rule ,
2024-03-11 15:57:38 -05:00
} )
2024-03-04 17:15:55 -06:00
waitForTimeChannel ( t , evalAppliedChan )
sender . AssertNumberOfCalls ( t , "Send" , 1 )
args , ok := sender . Calls ( ) [ 0 ] . Arguments [ 2 ] . ( definitions . PostableAlerts )
require . Truef ( t , ok , fmt . Sprintf ( "expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T" , sender . Calls ( ) [ 0 ] . Arguments [ 2 ] ) )
require . Len ( t , args . PostableAlerts , 1 )
} )
} )
t . Run ( "when there are no alerts to send it should not call notifiers" , func ( t * testing . T ) {
rule := models . AlertRuleGen ( withQueryForState ( t , eval . Normal ) ) ( )
evalAppliedChan := make ( chan time . Time )
sender := NewSyncAlertsSenderMock ( )
sender . EXPECT ( ) . Send ( mock . Anything , rule . GetKey ( ) , mock . Anything ) . Return ( )
sch , ruleStore , _ , _ := createSchedule ( evalAppliedChan , sender )
ruleStore . PutRule ( context . Background ( ) , rule )
2024-03-06 13:44:53 -06:00
factory := ruleFactoryFromScheduler ( sch )
2024-03-04 17:15:55 -06:00
ctx , cancel := context . WithCancel ( context . Background ( ) )
t . Cleanup ( cancel )
2024-03-06 13:44:53 -06:00
ruleInfo := factory . new ( ctx )
2024-03-04 17:15:55 -06:00
go func ( ) {
2024-03-11 15:57:38 -05:00
_ = ruleInfo . Run ( rule . GetKey ( ) )
2024-03-04 17:15:55 -06:00
} ( )
2024-03-11 15:57:38 -05:00
ruleInfo . Eval ( & Evaluation {
2024-03-04 17:15:55 -06:00
scheduledAt : sch . clock . Now ( ) ,
rule : rule ,
2024-03-11 15:57:38 -05:00
} )
2024-03-04 17:15:55 -06:00
waitForTimeChannel ( t , evalAppliedChan )
sender . AssertNotCalled ( t , "Send" , mock . Anything , mock . Anything )
require . NotEmpty ( t , sch . stateManager . GetStatesForRuleUID ( rule . OrgID , rule . UID ) )
} )
}
2024-03-06 13:44:53 -06:00
func ruleFactoryFromScheduler ( sch * schedule ) ruleFactory {
return newRuleFactory ( sch . appURL , sch . disableGrafanaFolder , sch . maxAttempts , sch . alertsSender , sch . stateManager , sch . evaluatorFactory , & sch . schedulableAlertRules , sch . clock , sch . metrics , sch . log , sch . tracer , sch . evalAppliedFunc , sch . stopAppliedFunc )
}