mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Rename evalCtx to avoid confusion with context.Context (#45144)
This commit is contained in:
parent
7152deb92f
commit
2ca79ca0c7
@ -61,7 +61,7 @@ type schedule struct {
|
||||
|
||||
clock clock.Clock
|
||||
|
||||
heartbeat *alerting.Ticker
|
||||
ticker *alerting.Ticker
|
||||
|
||||
// evalApplied is only used for tests: test code can set it to non-nil
|
||||
// function, and then it'll be called from the event loop whenever the
|
||||
@ -130,7 +130,7 @@ func NewScheduler(cfg SchedulerCfg, expressionService *expr.Service, appURL *url
|
||||
clock: cfg.C,
|
||||
baseInterval: cfg.BaseInterval,
|
||||
log: cfg.Logger,
|
||||
heartbeat: ticker,
|
||||
ticker: ticker,
|
||||
evalAppliedFunc: cfg.EvalAppliedFunc,
|
||||
stopAppliedFunc: cfg.StopAppliedFunc,
|
||||
evaluator: cfg.Evaluator,
|
||||
@ -157,7 +157,7 @@ func (sch *schedule) Pause() error {
|
||||
if sch == nil {
|
||||
return fmt.Errorf("scheduler is not initialised")
|
||||
}
|
||||
sch.heartbeat.Pause()
|
||||
sch.ticker.Pause()
|
||||
sch.log.Info("alert rule scheduler paused", "now", sch.clock.Now())
|
||||
return nil
|
||||
}
|
||||
@ -166,7 +166,7 @@ func (sch *schedule) Unpause() error {
|
||||
if sch == nil {
|
||||
return fmt.Errorf("scheduler is not initialised")
|
||||
}
|
||||
sch.heartbeat.Unpause()
|
||||
sch.ticker.Unpause()
|
||||
sch.log.Info("alert rule scheduler unpaused", "now", sch.clock.Now())
|
||||
return nil
|
||||
}
|
||||
@ -367,7 +367,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
|
||||
dispatcherGroup, ctx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
select {
|
||||
case tick := <-sch.heartbeat.C:
|
||||
case tick := <-sch.ticker.C:
|
||||
start := time.Now()
|
||||
sch.metrics.BehindSeconds.Set(start.Sub(tick).Seconds())
|
||||
|
||||
@ -468,7 +468,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, updateCh <-chan struct{}) error {
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan struct{}) error {
|
||||
logger := sch.log.New("uid", key.UID, "org", key.OrgID)
|
||||
logger.Debug("alert rule routine started")
|
||||
|
||||
@ -541,16 +541,16 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
return q.Result, nil
|
||||
}
|
||||
|
||||
evaluate := func(ctx context.Context, alertRule *models.AlertRule, attempt int64, evalCtx *evalContext) error {
|
||||
logger := logger.New("version", alertRule.Version, "attempt", attempt, "now", evalCtx.now)
|
||||
evaluate := func(ctx context.Context, r *models.AlertRule, attempt int64, e *evaluation) error {
|
||||
logger := logger.New("version", r.Version, "attempt", attempt, "now", e.scheduledAt)
|
||||
start := sch.clock.Now()
|
||||
|
||||
condition := models.Condition{
|
||||
Condition: alertRule.Condition,
|
||||
OrgID: alertRule.OrgID,
|
||||
Data: alertRule.Data,
|
||||
Condition: r.Condition,
|
||||
OrgID: r.OrgID,
|
||||
Data: r.Data,
|
||||
}
|
||||
results, err := sch.evaluator.ConditionEval(&condition, evalCtx.now, sch.expressionService)
|
||||
results, err := sch.evaluator.ConditionEval(&condition, e.scheduledAt, sch.expressionService)
|
||||
dur := sch.clock.Now().Sub(start)
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(dur.Seconds())
|
||||
@ -562,7 +562,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
}
|
||||
logger.Debug("alert rule evaluated", "results", results, "duration", dur)
|
||||
|
||||
processedStates := sch.stateManager.ProcessEvalResults(ctx, alertRule, results)
|
||||
processedStates := sch.stateManager.ProcessEvalResults(ctx, r, results)
|
||||
sch.saveAlertStates(ctx, processedStates)
|
||||
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||
|
||||
@ -616,7 +616,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
evalRunning = true
|
||||
defer func() {
|
||||
evalRunning = false
|
||||
sch.evalApplied(key, ctx.now)
|
||||
sch.evalApplied(key, ctx.scheduledAt)
|
||||
}()
|
||||
|
||||
err := retryIfError(func(attempt int64) error {
|
||||
@ -741,7 +741,7 @@ func (r *alertRuleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
|
||||
}
|
||||
|
||||
type alertRuleInfo struct {
|
||||
evalCh chan *evalContext
|
||||
evalCh chan *evaluation
|
||||
updateCh chan struct{}
|
||||
ctx context.Context
|
||||
stop context.CancelFunc
|
||||
@ -749,15 +749,15 @@ type alertRuleInfo struct {
|
||||
|
||||
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
return &alertRuleInfo{evalCh: make(chan *evalContext), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
|
||||
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan struct{}), ctx: ctx, stop: cancel}
|
||||
}
|
||||
|
||||
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped
|
||||
func (a *alertRuleInfo) eval(t time.Time, version int64) bool {
|
||||
select {
|
||||
case a.evalCh <- &evalContext{
|
||||
now: t,
|
||||
version: version,
|
||||
case a.evalCh <- &evaluation{
|
||||
scheduledAt: t,
|
||||
version: version,
|
||||
}:
|
||||
return true
|
||||
case <-a.ctx.Done():
|
||||
@ -775,16 +775,16 @@ func (a *alertRuleInfo) update() bool {
|
||||
}
|
||||
}
|
||||
|
||||
type evalContext struct {
|
||||
now time.Time
|
||||
version int64
|
||||
type evaluation struct {
|
||||
scheduledAt time.Time
|
||||
version int64
|
||||
}
|
||||
|
||||
// overrideCfg is only used on tests.
|
||||
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
|
||||
sch.clock = cfg.C
|
||||
sch.baseInterval = cfg.BaseInterval
|
||||
sch.heartbeat = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
|
||||
sch.ticker = alerting.NewTicker(cfg.C.Now(), time.Second*0, cfg.C, int64(cfg.BaseInterval.Seconds()))
|
||||
sch.evalAppliedFunc = cfg.EvalAppliedFunc
|
||||
sch.stopAppliedFunc = cfg.StopAppliedFunc
|
||||
}
|
||||
|
@ -362,7 +362,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
for _, evalState := range normalStates {
|
||||
// TODO rewrite when we are able to mock/fake state manager
|
||||
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
sch, ruleStore, instanceStore, _, reg := createSchedule(evalAppliedChan)
|
||||
|
||||
@ -376,9 +376,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
expectedTime := time.UnixMicro(rand.Int63())
|
||||
|
||||
evalChan <- &evalContext{
|
||||
now: expectedTime,
|
||||
version: rule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: expectedTime,
|
||||
version: rule.Version,
|
||||
}
|
||||
|
||||
actualTime := waitForTimeChannel(t, evalAppliedChan)
|
||||
@ -468,7 +468,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext), make(chan struct{}))
|
||||
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan struct{}))
|
||||
stoppedChan <- err
|
||||
}()
|
||||
|
||||
@ -479,7 +479,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should fetch rule from database only if new version is greater than current", func(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
ctx := context.Background()
|
||||
@ -494,9 +494,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
}()
|
||||
|
||||
expectedTime := time.UnixMicro(rand.Int63())
|
||||
evalChan <- &evalContext{
|
||||
now: expectedTime,
|
||||
version: rule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: expectedTime,
|
||||
version: rule.Version,
|
||||
}
|
||||
|
||||
actualTime := waitForTimeChannel(t, evalAppliedChan)
|
||||
@ -509,9 +509,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
// and call with new version
|
||||
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
|
||||
evalChan <- &evalContext{
|
||||
now: expectedTime,
|
||||
version: newRule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: expectedTime,
|
||||
version: newRule.Version,
|
||||
}
|
||||
|
||||
actualTime = waitForTimeChannel(t, evalAppliedChan)
|
||||
@ -532,7 +532,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should not fetch rule if version is equal or less than current", func(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
|
||||
@ -546,9 +546,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
}()
|
||||
|
||||
expectedTime := time.UnixMicro(rand.Int63())
|
||||
evalChan <- &evalContext{
|
||||
now: expectedTime,
|
||||
version: rule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: expectedTime,
|
||||
version: rule.Version,
|
||||
}
|
||||
|
||||
actualTime := waitForTimeChannel(t, evalAppliedChan)
|
||||
@ -556,17 +556,17 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
// try again with the same version
|
||||
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
|
||||
evalChan <- &evalContext{
|
||||
now: expectedTime,
|
||||
version: rule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: expectedTime,
|
||||
version: rule.Version,
|
||||
}
|
||||
actualTime = waitForTimeChannel(t, evalAppliedChan)
|
||||
require.Equal(t, expectedTime, actualTime)
|
||||
|
||||
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
|
||||
evalChan <- &evalContext{
|
||||
now: expectedTime,
|
||||
version: rule.Version - 1,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: expectedTime,
|
||||
version: rule.Version - 1,
|
||||
}
|
||||
actualTime = waitForTimeChannel(t, evalAppliedChan)
|
||||
require.Equal(t, expectedTime, actualTime)
|
||||
@ -583,7 +583,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
t.Run("when update channel is not empty", func(t *testing.T) {
|
||||
t.Run("should fetch the alert rule from database", func(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
updateChan := make(chan struct{})
|
||||
|
||||
@ -613,9 +613,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
require.Equal(t, rule.OrgID, m.OrgID)
|
||||
|
||||
// now call evaluation loop to make sure that the rule was persisted
|
||||
evalChan <- &evalContext{
|
||||
now: time.UnixMicro(rand.Int63()),
|
||||
version: rule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: time.UnixMicro(rand.Int63()),
|
||||
version: rule.Version,
|
||||
}
|
||||
waitForTimeChannel(t, evalAppliedChan)
|
||||
|
||||
@ -638,7 +638,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evalContext), updateChan)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), updateChan)
|
||||
}()
|
||||
|
||||
ruleStore.Hook = func(cmd interface{}) error {
|
||||
@ -678,7 +678,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
return len(s.Alertmanagers()) == 1
|
||||
}, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.")
|
||||
|
||||
evalChan := make(chan *evalContext)
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
updateChan := make(chan struct{})
|
||||
|
||||
@ -784,7 +784,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
return len(s.Alertmanagers()) == 1
|
||||
}, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.")
|
||||
|
||||
evalChan := make(chan *evalContext)
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
|
||||
@ -798,9 +798,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan struct{}))
|
||||
}()
|
||||
|
||||
evalChan <- &evalContext{
|
||||
now: time.Now(),
|
||||
version: rule.Version,
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: time.Now(),
|
||||
version: rule.Version,
|
||||
}
|
||||
waitForTimeChannel(t, evalAppliedChan)
|
||||
|
||||
@ -844,7 +844,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
select {
|
||||
case ctx := <-r.evalCh:
|
||||
require.Equal(t, version, ctx.version)
|
||||
require.Equal(t, expected, ctx.now)
|
||||
require.Equal(t, expected, ctx.scheduledAt)
|
||||
require.True(t, <-resultCh)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
|
Loading…
Reference in New Issue
Block a user