Alerting: Decouple rule routine from scheduler (#84018)

* create rule factory for more complicated dep injection into rules

* Rules get direct access to metrics, logs, traces utilities, use factory in tests

* Use clock internal to rule

* Use sender, statemanager, evalfactory directly

* evalApplied and stopApplied

* use schedulableAlertRules behind interface

* loaded metrics reader

* 3 relevant config options

* Drop unused scheduler parameter

* Rename ruleRoutine to run

* Update READMED

* Handle long parameter lists

* remove dead branch
This commit is contained in:
Alexander Weaver 2024-03-06 13:44:53 -06:00 committed by GitHub
parent 8b9bc9a919
commit d5fda06147
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 208 additions and 76 deletions

View File

@ -30,7 +30,7 @@ The scheduler runs at a fixed interval, called its heartbeat, in which it does a
3. Send an `*evaluation` event to the goroutine for each alert rule if its interval has elapsed 3. Send an `*evaluation` event to the goroutine for each alert rule if its interval has elapsed
4. Stop the goroutines for all alert rules that have been deleted since the last heartbeat 4. Stop the goroutines for all alert rules that have been deleted since the last heartbeat
The function that evaluates each alert rule is called `ruleRoutine`. It waits for an `*evaluation` event (sent each The function that evaluates each alert rule is called `run`. It waits for an `*evaluation` event (sent each
interval seconds elapsed and is configurable per alert rule) and then evaluates the alert rule. To ensure that the interval seconds elapsed and is configurable per alert rule) and then evaluates the alert rule. To ensure that the
scheduler is evaluating the latest version of the alert rule it compares its local version of the alert rule with that scheduler is evaluating the latest version of the alert rule it compares its local version of the alert rule with that
in the `*evaluation` event, fetching the latest version of the alert rule from the database if the version numbers in the `*evaluation` event, fetching the latest version of the alert rule from the database if the version numbers

View File

@ -4,10 +4,15 @@ import (
context "context" context "context"
"errors" "errors"
"fmt" "fmt"
"net/url"
"time" "time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/datasources" "github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state" "github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/org" "github.com/grafana/grafana/pkg/services/org"
@ -18,20 +23,114 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
type ruleFactoryFunc func(context.Context) *alertRuleInfo
func (f ruleFactoryFunc) new(ctx context.Context) *alertRuleInfo {
return f(ctx)
}
func newRuleFactory(
appURL *url.URL,
disableGrafanaFolder bool,
maxAttempts int64,
sender AlertsSender,
stateManager *state.Manager,
evalFactory eval.EvaluatorFactory,
ruleProvider ruleProvider,
clock clock.Clock,
met *metrics.Scheduler,
logger log.Logger,
tracer tracing.Tracer,
evalAppliedHook evalAppliedFunc,
stopAppliedHook stopAppliedFunc,
) ruleFactoryFunc {
return func(ctx context.Context) *alertRuleInfo {
return newAlertRuleInfo(
ctx,
appURL,
disableGrafanaFolder,
maxAttempts,
sender,
stateManager,
evalFactory,
ruleProvider,
clock,
met,
logger,
tracer,
evalAppliedHook,
stopAppliedHook,
)
}
}
type evalAppliedFunc = func(ngmodels.AlertRuleKey, time.Time)
type stopAppliedFunc = func(ngmodels.AlertRuleKey)
type ruleProvider interface {
get(ngmodels.AlertRuleKey) *ngmodels.AlertRule
}
type alertRuleInfo struct { type alertRuleInfo struct {
evalCh chan *evaluation evalCh chan *evaluation
updateCh chan ruleVersionAndPauseStatus updateCh chan ruleVersionAndPauseStatus
ctx context.Context ctx context.Context
stopFn util.CancelCauseFunc stopFn util.CancelCauseFunc
appURL *url.URL
disableGrafanaFolder bool
maxAttempts int64
clock clock.Clock
sender AlertsSender
stateManager *state.Manager
evalFactory eval.EvaluatorFactory
ruleProvider ruleProvider
// Event hooks that are only used in tests.
evalAppliedHook evalAppliedFunc
stopAppliedHook stopAppliedFunc
metrics *metrics.Scheduler
logger log.Logger
tracer tracing.Tracer
} }
func newAlertRuleInfo(parent context.Context) *alertRuleInfo { func newAlertRuleInfo(
parent context.Context,
appURL *url.URL,
disableGrafanaFolder bool,
maxAttempts int64,
sender AlertsSender,
stateManager *state.Manager,
evalFactory eval.EvaluatorFactory,
ruleProvider ruleProvider,
clock clock.Clock,
met *metrics.Scheduler,
logger log.Logger,
tracer tracing.Tracer,
evalAppliedHook func(ngmodels.AlertRuleKey, time.Time),
stopAppliedHook func(ngmodels.AlertRuleKey),
) *alertRuleInfo {
ctx, stop := util.WithCancelCause(parent) ctx, stop := util.WithCancelCause(parent)
return &alertRuleInfo{ return &alertRuleInfo{
evalCh: make(chan *evaluation), evalCh: make(chan *evaluation),
updateCh: make(chan ruleVersionAndPauseStatus), updateCh: make(chan ruleVersionAndPauseStatus),
ctx: ctx, ctx: ctx,
stopFn: stop, stopFn: stop,
appURL: appURL,
disableGrafanaFolder: disableGrafanaFolder,
maxAttempts: maxAttempts,
clock: clock,
sender: sender,
stateManager: stateManager,
evalFactory: evalFactory,
ruleProvider: ruleProvider,
evalAppliedHook: evalAppliedHook,
stopAppliedHook: stopAppliedHook,
metrics: met,
logger: logger,
tracer: tracer,
} }
} }
@ -82,52 +181,49 @@ func (a *alertRuleInfo) stop(reason error) {
} }
//nolint:gocyclo //nolint:gocyclo
func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) error { func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
grafanaCtx := ngmodels.WithRuleKey(a.ctx, key) grafanaCtx := ngmodels.WithRuleKey(a.ctx, key)
logger := sch.log.FromContext(grafanaCtx) logger := a.logger.FromContext(grafanaCtx)
logger.Debug("Alert rule routine started") logger.Debug("Alert rule routine started")
orgID := fmt.Sprint(key.OrgID) orgID := fmt.Sprint(key.OrgID)
evalTotal := sch.metrics.EvalTotal.WithLabelValues(orgID) evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID)
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID) evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID)
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID) evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID)
processDuration := sch.metrics.ProcessDuration.WithLabelValues(orgID) processDuration := a.metrics.ProcessDuration.WithLabelValues(orgID)
sendDuration := sch.metrics.SendDuration.WithLabelValues(orgID) sendDuration := a.metrics.SendDuration.WithLabelValues(orgID)
notify := func(states []state.StateTransition) { notify := func(states []state.StateTransition) {
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock) expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
if len(expiredAlerts.PostableAlerts) > 0 { if len(expiredAlerts.PostableAlerts) > 0 {
sch.alertsSender.Send(grafanaCtx, key, expiredAlerts) a.sender.Send(grafanaCtx, key, expiredAlerts)
} }
} }
resetState := func(ctx context.Context, isPaused bool) { resetState := func(ctx context.Context, isPaused bool) {
rule := sch.schedulableAlertRules.get(key) rule := a.ruleProvider.get(key)
reason := ngmodels.StateReasonUpdated reason := ngmodels.StateReasonUpdated
if isPaused { if isPaused {
reason = ngmodels.StateReasonPaused reason = ngmodels.StateReasonPaused
} }
states := sch.stateManager.ResetStateByRuleUID(ctx, rule, reason) states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason)
notify(states) notify(states)
} }
evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error { evaluate := func(ctx context.Context, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error {
logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx) logger := logger.New("version", e.rule.Version, "fingerprint", f, "attempt", attempt, "now", e.scheduledAt).FromContext(ctx)
start := sch.clock.Now() start := a.clock.Now()
evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), sch.newLoadedMetricsReader(e.rule)) evalCtx := eval.NewContextWithPreviousResults(ctx, SchedulerUserFor(e.rule.OrgID), a.newLoadedMetricsReader(e.rule))
if sch.evaluatorFactory == nil { ruleEval, err := a.evalFactory.Create(evalCtx, e.rule.GetEvalCondition())
panic("evalfactory nil")
}
ruleEval, err := sch.evaluatorFactory.Create(evalCtx, e.rule.GetEvalCondition())
var results eval.Results var results eval.Results
var dur time.Duration var dur time.Duration
if err != nil { if err != nil {
dur = sch.clock.Now().Sub(start) dur = a.clock.Now().Sub(start)
logger.Error("Failed to build rule evaluator", "error", err) logger.Error("Failed to build rule evaluator", "error", err)
} else { } else {
results, err = ruleEval.Evaluate(ctx, e.scheduledAt) results, err = ruleEval.Evaluate(ctx, e.scheduledAt)
dur = sch.clock.Now().Sub(start) dur = a.clock.Now().Sub(start)
if err != nil { if err != nil {
logger.Error("Failed to evaluate rule", "error", err, "duration", dur) logger.Error("Failed to evaluate rule", "error", err, "duration", dur)
} }
@ -181,33 +277,33 @@ func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) er
attribute.Int64("results", int64(len(results))), attribute.Int64("results", int64(len(results))),
)) ))
} }
start = sch.clock.Now() start = a.clock.Now()
processedStates := sch.stateManager.ProcessEvalResults( processedStates := a.stateManager.ProcessEvalResults(
ctx, ctx,
e.scheduledAt, e.scheduledAt,
e.rule, e.rule,
results, results,
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !sch.disableGrafanaFolder), state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder),
) )
processDuration.Observe(sch.clock.Now().Sub(start).Seconds()) processDuration.Observe(a.clock.Now().Sub(start).Seconds())
start = sch.clock.Now() start = a.clock.Now()
alerts := state.FromStateTransitionToPostableAlerts(processedStates, sch.stateManager, sch.appURL) alerts := state.FromStateTransitionToPostableAlerts(processedStates, a.stateManager, a.appURL)
span.AddEvent("results processed", trace.WithAttributes( span.AddEvent("results processed", trace.WithAttributes(
attribute.Int64("state_transitions", int64(len(processedStates))), attribute.Int64("state_transitions", int64(len(processedStates))),
attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))), attribute.Int64("alerts_to_send", int64(len(alerts.PostableAlerts))),
)) ))
if len(alerts.PostableAlerts) > 0 { if len(alerts.PostableAlerts) > 0 {
sch.alertsSender.Send(ctx, key, alerts) a.sender.Send(ctx, key, alerts)
} }
sendDuration.Observe(sch.clock.Now().Sub(start).Seconds()) sendDuration.Observe(a.clock.Now().Sub(start).Seconds())
return nil return nil
} }
evalRunning := false evalRunning := false
var currentFingerprint fingerprint var currentFingerprint fingerprint
defer sch.stopApplied(key) defer a.stopApplied(key)
for { for {
select { select {
// used by external services (API) to notify that rule is updated. // used by external services (API) to notify that rule is updated.
@ -235,10 +331,10 @@ func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) er
evalRunning = true evalRunning = true
defer func() { defer func() {
evalRunning = false evalRunning = false
sch.evalApplied(key, ctx.scheduledAt) a.evalApplied(key, ctx.scheduledAt)
}() }()
for attempt := int64(1); attempt <= sch.maxAttempts; attempt++ { for attempt := int64(1); attempt <= a.maxAttempts; attempt++ {
isPaused := ctx.rule.IsPaused isPaused := ctx.rule.IsPaused
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint() f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
// Do not clean up state if the eval loop has just started. // Do not clean up state if the eval loop has just started.
@ -262,7 +358,7 @@ func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) er
fpStr := currentFingerprint.String() fpStr := currentFingerprint.String()
utcTick := ctx.scheduledAt.UTC().Format(time.RFC3339Nano) utcTick := ctx.scheduledAt.UTC().Format(time.RFC3339Nano)
tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution", trace.WithAttributes( tracingCtx, span := a.tracer.Start(grafanaCtx, "alert rule execution", trace.WithAttributes(
attribute.String("rule_uid", ctx.rule.UID), attribute.String("rule_uid", ctx.rule.UID),
attribute.Int64("org_id", ctx.rule.OrgID), attribute.Int64("org_id", ctx.rule.OrgID),
attribute.Int64("rule_version", ctx.rule.Version), attribute.Int64("rule_version", ctx.rule.Version),
@ -278,7 +374,7 @@ func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) er
return return
} }
retry := attempt < sch.maxAttempts retry := attempt < a.maxAttempts
err := evaluate(tracingCtx, f, attempt, ctx, span, retry) err := evaluate(tracingCtx, f, attempt, ctx, span, retry)
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors // This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not. // we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
@ -306,7 +402,7 @@ func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) er
// cases. // cases.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute) ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc() defer cancelFunc()
states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted) states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
notify(states) notify(states)
} }
logger.Debug("Stopping alert rule routine") logger.Debug("Stopping alert rule routine")
@ -316,21 +412,21 @@ func (a *alertRuleInfo) ruleRoutine(key ngmodels.AlertRuleKey, sch *schedule) er
} }
// evalApplied is only used on tests. // evalApplied is only used on tests.
func (sch *schedule) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) { func (a *alertRuleInfo) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) {
if sch.evalAppliedFunc == nil { if a.evalAppliedHook == nil {
return return
} }
sch.evalAppliedFunc(alertDefKey, now) a.evalAppliedHook(alertDefKey, now)
} }
// stopApplied is only used on tests. // stopApplied is only used on tests.
func (sch *schedule) stopApplied(alertDefKey ngmodels.AlertRuleKey) { func (a *alertRuleInfo) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
if sch.stopAppliedFunc == nil { if a.stopAppliedHook == nil {
return return
} }
sch.stopAppliedFunc(alertDefKey) a.stopAppliedHook(alertDefKey)
} }
func SchedulerUserFor(orgID int64) *user.SignedInUser { func SchedulerUserFor(orgID int64) *user.SignedInUser {

View File

@ -34,7 +34,7 @@ func TestAlertRuleInfo(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) { t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("update should send to updateCh", func(t *testing.T) { t.Run("update should send to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
resultCh := make(chan bool) resultCh := make(chan bool)
go func() { go func() {
resultCh <- r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) resultCh <- r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
@ -47,7 +47,7 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) { t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
version1 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false} version1 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version2 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false} version2 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
@ -73,7 +73,7 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("eval should send to evalCh", func(t *testing.T) { t.Run("eval should send to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
expected := time.Now() expected := time.Now()
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
data := &evaluation{ data := &evaluation{
@ -96,7 +96,7 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) { t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
time1 := time.UnixMilli(rand.Int63n(math.MaxInt64)) time1 := time.UnixMilli(rand.Int63n(math.MaxInt64))
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64)) time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
resultCh1 := make(chan evalResponse) resultCh1 := make(chan evalResponse)
@ -142,7 +142,7 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("eval should exit when context is cancelled", func(t *testing.T) { t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
data := &evaluation{ data := &evaluation{
scheduledAt: time.Now(), scheduledAt: time.Now(),
@ -166,13 +166,13 @@ func TestAlertRuleInfo(t *testing.T) {
}) })
t.Run("when rule evaluation is stopped", func(t *testing.T) { t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) { t.Run("Update should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
r.stop(errRuleDeleted) r.stop(errRuleDeleted)
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted) require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
require.False(t, r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})) require.False(t, r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}))
}) })
t.Run("eval should do nothing", func(t *testing.T) { t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
r.stop(nil) r.stop(nil)
data := &evaluation{ data := &evaluation{
scheduledAt: time.Now(), scheduledAt: time.Now(),
@ -184,19 +184,19 @@ func TestAlertRuleInfo(t *testing.T) {
require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.Nilf(t, dropped, "expected no dropped evaluations but got one")
}) })
t.Run("stop should do nothing", func(t *testing.T) { t.Run("stop should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
r.stop(nil) r.stop(nil)
r.stop(nil) r.stop(nil)
}) })
t.Run("stop should do nothing if parent context stopped", func(t *testing.T) { t.Run("stop should do nothing if parent context stopped", func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background()) ctx, cancelFn := context.WithCancel(context.Background())
r := newAlertRuleInfo(ctx) r := blankRuleInfoForTests(ctx)
cancelFn() cancelFn()
r.stop(nil) r.stop(nil)
}) })
}) })
t.Run("should be thread-safe", func(t *testing.T) { t.Run("should be thread-safe", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := blankRuleInfoForTests(context.Background())
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
go func() { go func() {
for { for {
@ -240,6 +240,11 @@ func TestAlertRuleInfo(t *testing.T) {
}) })
} }
func blankRuleInfoForTests(ctx context.Context) *alertRuleInfo {
factory := newRuleFactory(nil, false, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
return factory.new(context.Background())
}
func TestRuleRoutine(t *testing.T) { func TestRuleRoutine(t *testing.T) {
createSchedule := func( createSchedule := func(
evalAppliedChan chan time.Time, evalAppliedChan chan time.Time,
@ -269,11 +274,12 @@ func TestRuleRoutine(t *testing.T) {
rule := models.AlertRuleGen(withQueryForState(t, evalState))() rule := models.AlertRuleGen(withQueryForState(t, evalState))()
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID) folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
factory := ruleFactoryFromScheduler(sch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch) _ = ruleInfo.run(rule.GetKey())
}() }()
expectedTime := time.UnixMicro(rand.Int63()) expectedTime := time.UnixMicro(rand.Int63())
@ -418,10 +424,11 @@ func TestRuleRoutine(t *testing.T) {
expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.NotEmpty(t, expectedStates) require.NotEmpty(t, expectedStates)
factory := ruleFactoryFromScheduler(sch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ruleInfo := newAlertRuleInfo(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
err := ruleInfo.ruleRoutine(models.AlertRuleKey{}, sch) err := ruleInfo.run(models.AlertRuleKey{})
stoppedChan <- err stoppedChan <- err
}() }()
@ -438,9 +445,10 @@ func TestRuleRoutine(t *testing.T) {
_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil) _ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen(eval.WithEvaluatedAt(sch.clock.Now()))), nil)
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
ruleInfo := newAlertRuleInfo(context.Background()) factory := ruleFactoryFromScheduler(sch)
ruleInfo := factory.new(context.Background())
go func() { go func() {
err := ruleInfo.ruleRoutine(rule.GetKey(), sch) err := ruleInfo.run(rule.GetKey())
stoppedChan <- err stoppedChan <- err
}() }()
@ -465,12 +473,13 @@ func TestRuleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle}) sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle})
factory := ruleFactoryFromScheduler(sch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch) _ = ruleInfo.run(rule.GetKey())
}() }()
// init evaluation loop so it got the rule version // init evaluation loop so it got the rule version
@ -546,12 +555,13 @@ func TestRuleRoutine(t *testing.T) {
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender)
sch.maxAttempts = 3 sch.maxAttempts = 3
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
factory := ruleFactoryFromScheduler(sch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch) _ = ruleInfo.run(rule.GetKey())
}() }()
ruleInfo.evalCh <- &evaluation{ ruleInfo.evalCh <- &evaluation{
@ -651,12 +661,13 @@ func TestRuleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
factory := ruleFactoryFromScheduler(sch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch) _ = ruleInfo.run(rule.GetKey())
}() }()
ruleInfo.evalCh <- &evaluation{ ruleInfo.evalCh <- &evaluation{
@ -684,12 +695,13 @@ func TestRuleRoutine(t *testing.T) {
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender) sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
factory := ruleFactoryFromScheduler(sch)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
ruleInfo := newAlertRuleInfo(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.ruleRoutine(rule.GetKey(), sch) _ = ruleInfo.run(rule.GetKey())
}() }()
ruleInfo.evalCh <- &evaluation{ ruleInfo.evalCh <- &evaluation{
@ -704,3 +716,7 @@ func TestRuleRoutine(t *testing.T) {
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
}) })
} }
func ruleFactoryFromScheduler(sch *schedule) ruleFactory {
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc)
}

View File

@ -10,9 +10,9 @@ import (
var _ eval.AlertingResultsReader = AlertingResultsFromRuleState{} var _ eval.AlertingResultsReader = AlertingResultsFromRuleState{}
func (sch *schedule) newLoadedMetricsReader(rule *ngmodels.AlertRule) eval.AlertingResultsReader { func (a *alertRuleInfo) newLoadedMetricsReader(rule *ngmodels.AlertRule) eval.AlertingResultsReader {
return &AlertingResultsFromRuleState{ return &AlertingResultsFromRuleState{
Manager: sch.stateManager, Manager: a.stateManager,
Rule: rule, Rule: rule,
} }
} }

View File

@ -17,6 +17,10 @@ import (
var errRuleDeleted = errors.New("rule deleted") var errRuleDeleted = errors.New("rule deleted")
type ruleFactory interface {
new(context.Context) *alertRuleInfo
}
type alertRuleInfoRegistry struct { type alertRuleInfoRegistry struct {
mu sync.Mutex mu sync.Mutex
alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo
@ -24,13 +28,13 @@ type alertRuleInfoRegistry struct {
// getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one. // getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one.
// Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not. // Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not.
func (r *alertRuleInfoRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey) (*alertRuleInfo, bool) { func (r *alertRuleInfoRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey, factory ruleFactory) (*alertRuleInfo, bool) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key] info, ok := r.alertRuleInfo[key]
if !ok { if !ok {
info = newAlertRuleInfo(context) info = factory.new(context)
r.alertRuleInfo[key] = info r.alertRuleInfo[key] = info
} }
return info, !ok return info, !ok

View File

@ -235,9 +235,24 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
readyToRun := make([]readyToRunItem, 0) readyToRun := make([]readyToRunItem, 0)
updatedRules := make([]ngmodels.AlertRuleKeyWithVersion, 0, len(updated)) // this is needed for tests only updatedRules := make([]ngmodels.AlertRuleKeyWithVersion, 0, len(updated)) // this is needed for tests only
missingFolder := make(map[string][]string) missingFolder := make(map[string][]string)
ruleFactory := newRuleFactory(
sch.appURL,
sch.disableGrafanaFolder,
sch.maxAttempts,
sch.alertsSender,
sch.stateManager,
sch.evaluatorFactory,
&sch.schedulableAlertRules,
sch.clock,
sch.metrics,
sch.log,
sch.tracer,
sch.evalAppliedFunc,
sch.stopAppliedFunc,
)
for _, item := range alertRules { for _, item := range alertRules {
key := item.GetKey() key := item.GetKey()
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key, ruleFactory)
// enforce minimum evaluation interval // enforce minimum evaluation interval
if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) { if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) {
@ -249,7 +264,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
if newRoutine && !invalidInterval { if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error { dispatcherGroup.Go(func() error {
return ruleInfo.ruleRoutine(key, sch) return ruleInfo.run(key)
}) })
} }

View File

@ -360,9 +360,10 @@ func TestSchedule_deleteAlertRule(t *testing.T) {
t.Run("when rule exists", func(t *testing.T) { t.Run("when rule exists", func(t *testing.T) {
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) { t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
sch := setupScheduler(t, nil, nil, nil, nil, nil) sch := setupScheduler(t, nil, nil, nil, nil, nil)
ruleFactory := ruleFactoryFromScheduler(sch)
rule := models.AlertRuleGen()() rule := models.AlertRuleGen()()
key := rule.GetKey() key := rule.GetKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key) info, _ := sch.registry.getOrCreateInfo(context.Background(), key, ruleFactory)
sch.deleteAlertRule(key) sch.deleteAlertRule(key)
require.ErrorIs(t, info.ctx.Err(), errRuleDeleted) require.ErrorIs(t, info.ctx.Err(), errRuleDeleted)
require.False(t, sch.registry.exists(key)) require.False(t, sch.registry.exists(key))