From b926b6336db46a9b7ef8fa28de0b985a403201ad Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Tue, 28 May 2024 10:59:21 -0500 Subject: [PATCH] Alerting: Scheduled recording rules execute their queries (#88309) * Basic eval flow * Wiring-up * fix * Extend todo * Start with tests * Include some relevant tests, skip ones that seem to have timing-based race conditions * Some tests, touch up linter and todo * Solve TODO * Add tracing * Tests to make sure an eval went through * Wire up feature toggles * Update pkg/services/ngalert/schedule/recording_rule.go Co-authored-by: Steve Simpson * Update pkg/services/ngalert/schedule/recording_rule_test.go Co-authored-by: Steve Simpson * Update pkg/services/ngalert/schedule/recording_rule_test.go Co-authored-by: Steve Simpson * Update pkg/services/ngalert/schedule/recording_rule_test.go Co-authored-by: Steve Simpson --------- Co-authored-by: Steve Simpson --- pkg/services/ngalert/ngalert.go | 1 + pkg/services/ngalert/schedule/alert_rule.go | 15 +- .../ngalert/schedule/alert_rule_test.go | 26 ++- .../ngalert/schedule/recording_rule.go | 174 +++++++++++++- .../ngalert/schedule/recording_rule_test.go | 217 ++++++++++++++++++ pkg/services/ngalert/schedule/registry.go | 4 + pkg/services/ngalert/schedule/schedule.go | 5 + .../ngalert/schedule/schedule_unit_test.go | 3 +- 8 files changed, 431 insertions(+), 14 deletions(-) create mode 100644 pkg/services/ngalert/schedule/recording_rule_test.go diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index 4ae4fe4d9e3..33de353846d 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -298,6 +298,7 @@ func (ng *AlertNG) init() error { AppURL: appUrl, EvaluatorFactory: evalFactory, RuleStore: ng.store, + FeatureToggles: ng.FeatureToggles, Metrics: ng.Metrics.GetSchedulerMetrics(), AlertSender: alertsRouter, Tracer: ng.tracer, diff --git a/pkg/services/ngalert/schedule/alert_rule.go b/pkg/services/ngalert/schedule/alert_rule.go index c4d27f16a45..6c9477e58f4 100644 --- a/pkg/services/ngalert/schedule/alert_rule.go +++ b/pkg/services/ngalert/schedule/alert_rule.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/datasources" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -51,6 +52,7 @@ func newRuleFactory( evalFactory eval.EvaluatorFactory, ruleProvider ruleProvider, clock clock.Clock, + featureToggles featuremgmt.FeatureToggles, met *metrics.Scheduler, logger log.Logger, tracer tracing.Tracer, @@ -59,7 +61,16 @@ func newRuleFactory( ) ruleFactoryFunc { return func(ctx context.Context, rule *ngmodels.AlertRule) Rule { if rule.IsRecordingRule() { - return newRecordingRule(ctx, logger) + return newRecordingRule( + ctx, + maxAttempts, + clock, + evalFactory, + featureToggles, + logger, + met, + tracer, + ) } return newAlertRule( ctx, @@ -244,7 +255,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error { for attempt := int64(1); attempt <= a.maxAttempts; attempt++ { isPaused := ctx.rule.IsPaused - f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint() + f := ctx.Fingerprint() // Do not clean up state if the eval loop has just started. var needReset bool if currentFingerprint != 0 && currentFingerprint != f { diff --git a/pkg/services/ngalert/schedule/alert_rule_test.go b/pkg/services/ngalert/schedule/alert_rule_test.go index 1596375e801..cc5f2466800 100644 --- a/pkg/services/ngalert/schedule/alert_rule_test.go +++ b/pkg/services/ngalert/schedule/alert_rule_test.go @@ -20,6 +20,7 @@ import ( mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/grafana/grafana/pkg/infra/log" definitions "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/eval" models "github.com/grafana/grafana/pkg/services/ngalert/models" @@ -185,12 +186,12 @@ func TestAlertRule(t *testing.T) { require.False(t, success) require.Nilf(t, dropped, "expected no dropped evaluations but got one") }) - t.Run("stop should do nothing", func(t *testing.T) { + t.Run("calling stop multiple times should not panic", func(t *testing.T) { r := blankRuleForTests(context.Background()) r.Stop(nil) r.Stop(nil) }) - t.Run("stop should do nothing if parent context stopped", func(t *testing.T) { + t.Run("stop should not panic if parent context stopped", func(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) r := blankRuleForTests(ctx) cancelFn() @@ -240,10 +241,27 @@ func TestAlertRule(t *testing.T) { wg.Wait() }) + + t.Run("Run should exit if idle when Stop is called", func(t *testing.T) { + rule := blankRuleForTests(context.Background()) + runResult := make(chan error) + go func() { + runResult <- rule.Run(models.AlertRuleKey{}) + }() + + rule.Stop(nil) + + select { + case err := <-runResult: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("Run() never exited") + } + }) } func blankRuleForTests(ctx context.Context) *alertRule { - return newAlertRule(context.Background(), nil, false, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + return newAlertRule(context.Background(), nil, false, 0, nil, nil, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil) } func TestRuleRoutine(t *testing.T) { @@ -747,5 +765,5 @@ func TestRuleRoutine(t *testing.T) { } func ruleFactoryFromScheduler(sch *schedule) ruleFactory { - return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc) + return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.featureToggles, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc) } diff --git a/pkg/services/ngalert/schedule/recording_rule.go b/pkg/services/ngalert/schedule/recording_rule.go index 361edb46d6f..30c71f2e138 100644 --- a/pkg/services/ngalert/schedule/recording_rule.go +++ b/pkg/services/ngalert/schedule/recording_rule.go @@ -2,30 +2,72 @@ package schedule import ( context "context" + "fmt" + "time" + "github.com/benbjohnson/clock" + "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/services/ngalert/eval" + "github.com/grafana/grafana/pkg/services/ngalert/metrics" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/util" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type recordingRule struct { ctx context.Context + evalCh chan *Evaluation stopFn util.CancelCauseFunc - logger log.Logger + maxAttempts int64 + + clock clock.Clock + evalFactory eval.EvaluatorFactory + featureToggles featuremgmt.FeatureToggles + + // Event hooks that are only used in tests. + evalAppliedHook evalAppliedFunc + + logger log.Logger + metrics *metrics.Scheduler + tracer tracing.Tracer } -func newRecordingRule(parent context.Context, logger log.Logger) *recordingRule { +func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer) *recordingRule { ctx, stop := util.WithCancelCause(parent) return &recordingRule{ - ctx: ctx, - stopFn: stop, - logger: logger, + ctx: ctx, + evalCh: make(chan *Evaluation), + stopFn: stop, + clock: clock, + evalFactory: evalFactory, + featureToggles: ft, + maxAttempts: maxAttempts, + logger: logger, + metrics: metrics, + tracer: tracer, } } func (r *recordingRule) Eval(eval *Evaluation) (bool, *Evaluation) { - return true, nil + // read the channel in unblocking manner to make sure that there is no concurrent send operation. + var droppedMsg *Evaluation + select { + case droppedMsg = <-r.evalCh: + default: + } + + select { + case r.evalCh <- eval: + return true, droppedMsg + case <-r.ctx.Done(): + return false, droppedMsg + } } func (r *recordingRule) Update(lastVersion RuleVersionAndPauseStatus) bool { @@ -43,12 +85,130 @@ func (r *recordingRule) Run(key ngmodels.AlertRuleKey) error { logger := r.logger.FromContext(ctx) logger.Debug("Recording rule routine started") - // nolint:gosimple for { select { + case eval, ok := <-r.evalCh: + if !ok { + logger.Debug("Evaluation channel has been closed. Exiting") + return nil + } + if !r.featureToggles.IsEnabled(ctx, featuremgmt.FlagGrafanaManagedRecordingRules) { + logger.Warn("Recording rule scheduled but toggle is not enabled. Skipping") + return nil + } + // TODO: Skipping the "evalRunning" guard that the alert rule routine does, because it seems to be dead code and impossible to hit. + // TODO: Either implement me or remove from alert rules once investigated. + + r.doEvaluate(ctx, eval) case <-ctx.Done(): logger.Debug("Stopping recording rule routine") return nil } } } + +func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) { + logger := r.logger.FromContext(ctx).New("now", ev.scheduledAt, "fingerprint", ev.Fingerprint()) + orgID := fmt.Sprint(ev.rule.OrgID) + evalDuration := r.metrics.EvalDuration.WithLabelValues(orgID) + evalTotal := r.metrics.EvalTotal.WithLabelValues(orgID) + evalStart := r.clock.Now() + + defer func() { + evalTotal.Inc() + evalDuration.Observe(r.clock.Now().Sub(evalStart).Seconds()) + r.evaluationDoneTestHook(ev) + }() + + if ev.rule.IsPaused { + logger.Debug("Skip recording rule evaluation because it is paused") + return + } + + ctx, span := r.tracer.Start(ctx, "recording rule execution", trace.WithAttributes( + attribute.String("rule_uid", ev.rule.UID), + attribute.Int64("org_id", ev.rule.OrgID), + attribute.Int64("rule_version", ev.rule.Version), + attribute.String("rule_fingerprint", ev.Fingerprint().String()), + attribute.String("tick", ev.scheduledAt.UTC().Format(time.RFC3339Nano)), + )) + defer span.End() + + for attempt := int64(1); attempt <= r.maxAttempts; attempt++ { + logger := logger.New("attempt", attempt) + if ctx.Err() != nil { + span.SetStatus(codes.Error, "rule evaluation cancelled") + logger.Error("Skipping recording rule evaluation because context has been cancelled") + return + } + + err := r.tryEvaluation(ctx, ev, logger) + if err == nil { + return + } + + logger.Error("Failed to evaluate rule", "attempt", attempt, "error", err) + select { + case <-ctx.Done(): + logger.Error("Context has been cancelled while backing off", "attempt", attempt) + return + case <-time.After(retryDelay): + continue + } + } +} + +func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logger log.Logger) error { + orgID := fmt.Sprint(ev.rule.OrgID) + evalAttemptTotal := r.metrics.EvalAttemptTotal.WithLabelValues(orgID) + evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID) + evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID) + + start := r.clock.Now() + evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID)) + result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger) + dur := r.clock.Now().Sub(start) + + evalAttemptTotal.Inc() + span := trace.SpanFromContext(ctx) + + // TODO: In some cases, err can be nil but the dataframe itself contains embedded error frames. Parse these out like we do when evaluating alert rules. + // TODO: (Maybe, refactor something in eval package so we can use shared code for this) + if err != nil { + evalAttemptFailures.Inc() + // TODO: Only errors embedded in the frame can be considered retryable. + // TODO: Since we are not handling these yet per the above TODO, we can blindly consider all errors to be non-retryable for now, and just exit. + evalTotalFailures.Inc() + span.SetStatus(codes.Error, "rule evaluation failed") + span.RecordError(err) + return fmt.Errorf("server side expressions pipeline returned an error: %w", err) + } + + logger.Debug("Alert rule evaluated", "results", result, "duration", dur) + span.AddEvent("rule evaluated", trace.WithAttributes( + attribute.Int64("results", int64(len(result.Responses))), + )) + return nil +} + +func (r *recordingRule) buildAndExecutePipeline(ctx context.Context, evalCtx eval.EvaluationContext, ev *Evaluation, logger log.Logger) (*backend.QueryDataResponse, error) { + start := r.clock.Now() + evaluator, err := r.evalFactory.Create(evalCtx, ev.rule.GetEvalCondition()) + if err != nil { + logger.Error("Failed to build rule evaluator", "error", err) + return nil, err + } + results, err := evaluator.EvaluateRaw(ctx, ev.scheduledAt) + if err != nil { + logger.Error("Failed to evaluate rule", "error", err, "duration", r.clock.Now().Sub(start)) + } + return results, err +} + +func (r *recordingRule) evaluationDoneTestHook(ev *Evaluation) { + if r.evalAppliedHook == nil { + return + } + + r.evalAppliedHook(ev.rule.GetKey(), ev.scheduledAt) +} diff --git a/pkg/services/ngalert/schedule/recording_rule_test.go b/pkg/services/ngalert/schedule/recording_rule_test.go new file mode 100644 index 00000000000..e1c529eadc7 --- /dev/null +++ b/pkg/services/ngalert/schedule/recording_rule_test.go @@ -0,0 +1,217 @@ +package schedule + +import ( + "bytes" + context "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/services/featuremgmt" + models "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestRecordingRule(t *testing.T) { + gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules()) + // evalRetval carries the return value of Rule.Eval() calls. + type evalRetval struct { + success bool + droppedEval *Evaluation + } + + t.Run("when rule evaluation is not stopped", func(t *testing.T) { + t.Run("eval should send to evalCh", func(t *testing.T) { + r := blankRecordingRuleForTests(context.Background()) + expected := time.Now() + resultCh := make(chan evalRetval) + data := &Evaluation{ + scheduledAt: expected, + rule: gen.GenerateRef(), + folderTitle: util.GenerateShortUID(), + } + + go func() { + result, dropped := r.Eval(data) + resultCh <- evalRetval{result, dropped} + }() + + select { + case ctx := <-r.evalCh: + require.Equal(t, data, ctx) + result := <-resultCh // blocks + require.True(t, result.success) + require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one") + case <-time.After(5 * time.Second): + t.Fatal("No message was received on eval channel") + } + }) + }) + + t.Run("when rule evaluation is stopped", func(t *testing.T) { + t.Run("eval should do nothing", func(t *testing.T) { + r := blankRecordingRuleForTests(context.Background()) + r.Stop(nil) + ev := &Evaluation{ + scheduledAt: time.Now(), + rule: gen.GenerateRef(), + folderTitle: util.GenerateShortUID(), + } + + success, dropped := r.Eval(ev) + + require.False(t, success) + require.Nilf(t, dropped, "expected no dropped evaluations but got one") + }) + + t.Run("calling stop multiple times should not panic", func(t *testing.T) { + r := blankRecordingRuleForTests(context.Background()) + r.Stop(nil) + r.Stop(nil) + }) + + t.Run("stop should not panic if parent context stopped", func(t *testing.T) { + ctx, cancelFn := context.WithCancel(context.Background()) + r := blankRecordingRuleForTests(ctx) + cancelFn() + r.Stop(nil) + }) + }) + + t.Run("eval should be thread-safe", func(t *testing.T) { + r := blankRecordingRuleForTests(context.Background()) + wg := sync.WaitGroup{} + go func() { + for { + select { + case <-r.evalCh: + time.Sleep(time.Microsecond) + case <-r.ctx.Done(): + return + } + } + }() + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + for i := 0; i < 20; i++ { + max := 3 + if i <= 10 { + max = 2 + } + switch rand.Intn(max) + 1 { + case 1: + r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) + case 2: + r.Eval(&Evaluation{ + scheduledAt: time.Now(), + rule: gen.GenerateRef(), + folderTitle: util.GenerateShortUID(), + }) + case 3: + r.Stop(nil) + } + } + wg.Done() + }() + } + + wg.Wait() + }) + + t.Run("Run should exit if idle when Stop is called", func(t *testing.T) { + rule := blankRecordingRuleForTests(context.Background()) + runResult := make(chan error) + go func() { + runResult <- rule.Run(models.AlertRuleKey{}) + }() + + rule.Stop(nil) + + select { + case err := <-runResult: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("Run() never exited") + } + }) +} + +func blankRecordingRuleForTests(ctx context.Context) *recordingRule { + ft := featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules) + return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil) +} + +func TestRecordingRule_Integration(t *testing.T) { + gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules()) + ruleStore := newFakeRulesStore() + reg := prometheus.NewPedanticRegistry() + sch := setupScheduler(t, ruleStore, nil, reg, nil, nil) + rule := gen.GenerateRef() + ruleStore.PutRule(context.Background(), rule) + folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID) + ruleFactory := ruleFactoryFromScheduler(sch) + + process := ruleFactory.new(context.Background(), rule) + evalDoneChan := make(chan time.Time) + process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) { + evalDoneChan <- t + } + now := time.Now() + + go func() { + _ = process.Run(rule.GetKey()) + }() + process.Eval(&Evaluation{ + scheduledAt: now, + rule: rule, + folderTitle: folderTitle, + }) + _ = waitForTimeChannel(t, evalDoneChan) + + t.Run("reports basic evaluation metrics", func(t *testing.T) { + expectedMetric := fmt.Sprintf( + ` + # HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule. + # TYPE grafana_alerting_rule_evaluation_duration_seconds histogram + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1 + grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0 + grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1 + # HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations. + # TYPE grafana_alerting_rule_evaluations_total counter + grafana_alerting_rule_evaluations_total{org="%[1]d"} 1 + # HELP grafana_alerting_rule_evaluation_attempts_total The total number of rule evaluation attempts. + # TYPE grafana_alerting_rule_evaluation_attempts_total counter + grafana_alerting_rule_evaluation_attempts_total{org="%[1]d"} 1 + `, + rule.OrgID, + ) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), + "grafana_alerting_rule_evaluation_duration_seconds", + "grafana_alerting_rule_evaluations_total", + "grafana_alerting_rule_evaluation_attempts_total", + ) + require.NoError(t, err) + }) +} diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index f2e797b1281..576b51045d6 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -87,6 +87,10 @@ type Evaluation struct { folderTitle string } +func (e *Evaluation) Fingerprint() fingerprint { + return ruleWithFolder{e.rule, e.folderTitle}.Fingerprint() +} + type alertRulesRegistry struct { rules map[models.AlertRuleKey]*models.AlertRule folderTitles map[models.FolderKey]string diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index f6e9178d2e0..16ecc8b5f81 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/eval" "github.com/grafana/grafana/pkg/services/ngalert/metrics" @@ -77,6 +78,7 @@ type schedule struct { appURL *url.URL disableGrafanaFolder bool jitterEvaluations JitterStrategy + featureToggles featuremgmt.FeatureToggles metrics *metrics.Scheduler @@ -99,6 +101,7 @@ type SchedulerCfg struct { C clock.Clock MinRuleInterval time.Duration DisableGrafanaFolder bool + FeatureToggles featuremgmt.FeatureToggles AppURL *url.URL JitterEvaluations JitterStrategy EvaluatorFactory eval.EvaluatorFactory @@ -129,6 +132,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule { appURL: cfg.AppURL, disableGrafanaFolder: cfg.DisableGrafanaFolder, jitterEvaluations: cfg.JitterEvaluations, + featureToggles: cfg.FeatureToggles, stateManager: stateManager, minRuleInterval: cfg.MinRuleInterval, schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)}, @@ -246,6 +250,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, + sch.featureToggles, sch.metrics, sch.log, sch.tracer, diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index 4eba8e86f2d..8e440c90324 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -436,7 +436,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor var evaluator = evalMock if evalMock == nil { - evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{}) + evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, &datasources.FakeCacheService{}, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{}) } if registry == nil { @@ -466,6 +466,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor AppURL: appUrl, EvaluatorFactory: evaluator, RuleStore: rs, + FeatureToggles: featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules), Metrics: m.GetSchedulerMetrics(), AlertSender: senderMock, Tracer: testTracer,