diff --git a/pkg/services/ngalert/schedule/alert_rule.go b/pkg/services/ngalert/schedule/alert_rule.go index 6ba1d57d9ef..c4d27f16a45 100644 --- a/pkg/services/ngalert/schedule/alert_rule.go +++ b/pkg/services/ngalert/schedule/alert_rule.go @@ -36,10 +36,10 @@ type Rule interface { Update(lastVersion RuleVersionAndPauseStatus) bool } -type ruleFactoryFunc func(context.Context) Rule +type ruleFactoryFunc func(context.Context, *ngmodels.AlertRule) Rule -func (f ruleFactoryFunc) new(ctx context.Context) Rule { - return f(ctx) +func (f ruleFactoryFunc) new(ctx context.Context, rule *ngmodels.AlertRule) Rule { + return f(ctx, rule) } func newRuleFactory( @@ -57,7 +57,10 @@ func newRuleFactory( evalAppliedHook evalAppliedFunc, stopAppliedHook stopAppliedFunc, ) ruleFactoryFunc { - return func(ctx context.Context) Rule { + return func(ctx context.Context, rule *ngmodels.AlertRule) Rule { + if rule.IsRecordingRule() { + return newRecordingRule(ctx, logger) + } return newAlertRule( ctx, appURL, diff --git a/pkg/services/ngalert/schedule/alert_rule_test.go b/pkg/services/ngalert/schedule/alert_rule_test.go index bbff8eb9a3e..1596375e801 100644 --- a/pkg/services/ngalert/schedule/alert_rule_test.go +++ b/pkg/services/ngalert/schedule/alert_rule_test.go @@ -279,7 +279,7 @@ func TestRuleRoutine(t *testing.T) { factory := ruleFactoryFromScheduler(sch) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleInfo := factory.new(ctx) + ruleInfo := factory.new(ctx, rule) go func() { _ = ruleInfo.Run(rule.GetKey()) }() @@ -442,7 +442,7 @@ func TestRuleRoutine(t *testing.T) { factory := ruleFactoryFromScheduler(sch) ctx, cancel := context.WithCancel(context.Background()) - ruleInfo := factory.new(ctx) + ruleInfo := factory.new(ctx, rule) go func() { err := ruleInfo.Run(models.AlertRuleKey{}) stoppedChan <- err @@ -462,7 +462,7 @@ func TestRuleRoutine(t *testing.T) { require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)) factory := ruleFactoryFromScheduler(sch) - ruleInfo := factory.new(context.Background()) + ruleInfo := factory.new(context.Background(), rule) go func() { err := ruleInfo.Run(rule.GetKey()) stoppedChan <- err @@ -492,7 +492,7 @@ func TestRuleRoutine(t *testing.T) { factory := ruleFactoryFromScheduler(sch) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleInfo := factory.new(ctx) + ruleInfo := factory.new(ctx, rule) go func() { _ = ruleInfo.Run(rule.GetKey()) @@ -574,7 +574,7 @@ func TestRuleRoutine(t *testing.T) { factory := ruleFactoryFromScheduler(sch) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleInfo := factory.new(ctx) + ruleInfo := factory.new(ctx, rule) go func() { _ = ruleInfo.Run(rule.GetKey()) @@ -693,7 +693,7 @@ func TestRuleRoutine(t *testing.T) { factory := ruleFactoryFromScheduler(sch) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleInfo := factory.new(ctx) + ruleInfo := factory.new(ctx, rule) go func() { _ = ruleInfo.Run(rule.GetKey()) @@ -727,7 +727,7 @@ func TestRuleRoutine(t *testing.T) { factory := ruleFactoryFromScheduler(sch) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - ruleInfo := factory.new(ctx) + ruleInfo := factory.new(ctx, rule) go func() { _ = ruleInfo.Run(rule.GetKey()) diff --git a/pkg/services/ngalert/schedule/recording_rule.go b/pkg/services/ngalert/schedule/recording_rule.go new file mode 100644 index 00000000000..361edb46d6f --- /dev/null +++ b/pkg/services/ngalert/schedule/recording_rule.go @@ -0,0 +1,54 @@ +package schedule + +import ( + context "context" + + "github.com/grafana/grafana/pkg/infra/log" + ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" + "github.com/grafana/grafana/pkg/util" +) + +type recordingRule struct { + ctx context.Context + stopFn util.CancelCauseFunc + + logger log.Logger +} + +func newRecordingRule(parent context.Context, logger log.Logger) *recordingRule { + ctx, stop := util.WithCancelCause(parent) + return &recordingRule{ + ctx: ctx, + stopFn: stop, + logger: logger, + } +} + +func (r *recordingRule) Eval(eval *Evaluation) (bool, *Evaluation) { + return true, nil +} + +func (r *recordingRule) Update(lastVersion RuleVersionAndPauseStatus) bool { + return true +} + +func (r *recordingRule) Stop(reason error) { + if r.stopFn != nil { + r.stopFn(reason) + } +} + +func (r *recordingRule) Run(key ngmodels.AlertRuleKey) error { + ctx := ngmodels.WithRuleKey(r.ctx, key) + logger := r.logger.FromContext(ctx) + logger.Debug("Recording rule routine started") + + // nolint:gosimple + for { + select { + case <-ctx.Done(): + logger.Debug("Stopping recording rule routine") + return nil + } + } +} diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index d2214da9aaf..f2e797b1281 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -18,7 +18,7 @@ import ( var errRuleDeleted = errors.New("rule deleted") type ruleFactory interface { - new(context.Context) Rule + new(context.Context, *models.AlertRule) Rule } type ruleRegistry struct { @@ -30,15 +30,16 @@ func newRuleRegistry() ruleRegistry { return ruleRegistry{rules: make(map[models.AlertRuleKey]Rule)} } -// getOrCreate gets rule routine from registry by the key. If it does not exist, it creates a new one. +// getOrCreate gets a rule routine from registry for the provided rule. If it does not exist, it creates a new one. // Returns a pointer to the rule routine and a flag that indicates whether it is a new struct or not. -func (r *ruleRegistry) getOrCreate(context context.Context, key models.AlertRuleKey, factory ruleFactory) (Rule, bool) { +func (r *ruleRegistry) getOrCreate(context context.Context, item *models.AlertRule, factory ruleFactory) (Rule, bool) { r.mu.Lock() defer r.mu.Unlock() + key := item.GetKey() rule, ok := r.rules[key] if !ok { - rule = factory.new(context) + rule = factory.new(context, item) r.rules[key] = rule } return rule, !ok diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index b8759a5220d..f6e9178d2e0 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -253,9 +253,10 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. sch.stopAppliedFunc, ) for _, item := range alertRules { + ruleRoutine, newRoutine := sch.registry.getOrCreate(ctx, item, ruleFactory) key := item.GetKey() - ruleRoutine, newRoutine := sch.registry.getOrCreate(ctx, key, ruleFactory) logger := sch.log.FromContext(ctx).New(key.LogContext()...) + // enforce minimum evaluation interval if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) { logger.Debug("Interval adjusted", "originalInterval", item.IntervalSeconds, "adjustedInterval", sch.minRuleInterval.Seconds()) diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index b71da365056..4eba8e86f2d 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -405,7 +405,7 @@ func TestSchedule_deleteAlertRule(t *testing.T) { ruleFactory := ruleFactoryFromScheduler(sch) rule := models.RuleGen.GenerateRef() key := rule.GetKey() - info, _ := sch.registry.getOrCreate(context.Background(), key, ruleFactory) + info, _ := sch.registry.getOrCreate(context.Background(), rule, ruleFactory) sch.deleteAlertRule(key) require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted) require.False(t, sch.registry.exists(key))