mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Schedule a shim implementation for recording rules (#87939)
* Add shim rule implementation for recording rules * Give ruleFactory access to the original rule definition * Schedule shim implementation if the rule is a recording rule * Fix or suppress linter * Fix nolint
This commit is contained in:
parent
fc11350554
commit
89b54d06e9
@ -36,10 +36,10 @@ type Rule interface {
|
|||||||
Update(lastVersion RuleVersionAndPauseStatus) bool
|
Update(lastVersion RuleVersionAndPauseStatus) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type ruleFactoryFunc func(context.Context) Rule
|
type ruleFactoryFunc func(context.Context, *ngmodels.AlertRule) Rule
|
||||||
|
|
||||||
func (f ruleFactoryFunc) new(ctx context.Context) Rule {
|
func (f ruleFactoryFunc) new(ctx context.Context, rule *ngmodels.AlertRule) Rule {
|
||||||
return f(ctx)
|
return f(ctx, rule)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRuleFactory(
|
func newRuleFactory(
|
||||||
@ -57,7 +57,10 @@ func newRuleFactory(
|
|||||||
evalAppliedHook evalAppliedFunc,
|
evalAppliedHook evalAppliedFunc,
|
||||||
stopAppliedHook stopAppliedFunc,
|
stopAppliedHook stopAppliedFunc,
|
||||||
) ruleFactoryFunc {
|
) ruleFactoryFunc {
|
||||||
return func(ctx context.Context) Rule {
|
return func(ctx context.Context, rule *ngmodels.AlertRule) Rule {
|
||||||
|
if rule.IsRecordingRule() {
|
||||||
|
return newRecordingRule(ctx, logger)
|
||||||
|
}
|
||||||
return newAlertRule(
|
return newAlertRule(
|
||||||
ctx,
|
ctx,
|
||||||
appURL,
|
appURL,
|
||||||
|
@ -279,7 +279,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
ruleInfo := factory.new(ctx)
|
ruleInfo := factory.new(ctx, rule)
|
||||||
go func() {
|
go func() {
|
||||||
_ = ruleInfo.Run(rule.GetKey())
|
_ = ruleInfo.Run(rule.GetKey())
|
||||||
}()
|
}()
|
||||||
@ -442,7 +442,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
|
|
||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
ruleInfo := factory.new(ctx)
|
ruleInfo := factory.new(ctx, rule)
|
||||||
go func() {
|
go func() {
|
||||||
err := ruleInfo.Run(models.AlertRuleKey{})
|
err := ruleInfo.Run(models.AlertRuleKey{})
|
||||||
stoppedChan <- err
|
stoppedChan <- err
|
||||||
@ -462,7 +462,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
||||||
|
|
||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ruleInfo := factory.new(context.Background())
|
ruleInfo := factory.new(context.Background(), rule)
|
||||||
go func() {
|
go func() {
|
||||||
err := ruleInfo.Run(rule.GetKey())
|
err := ruleInfo.Run(rule.GetKey())
|
||||||
stoppedChan <- err
|
stoppedChan <- err
|
||||||
@ -492,7 +492,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
ruleInfo := factory.new(ctx)
|
ruleInfo := factory.new(ctx, rule)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_ = ruleInfo.Run(rule.GetKey())
|
_ = ruleInfo.Run(rule.GetKey())
|
||||||
@ -574,7 +574,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
ruleInfo := factory.new(ctx)
|
ruleInfo := factory.new(ctx, rule)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_ = ruleInfo.Run(rule.GetKey())
|
_ = ruleInfo.Run(rule.GetKey())
|
||||||
@ -693,7 +693,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
ruleInfo := factory.new(ctx)
|
ruleInfo := factory.new(ctx, rule)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_ = ruleInfo.Run(rule.GetKey())
|
_ = ruleInfo.Run(rule.GetKey())
|
||||||
@ -727,7 +727,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
factory := ruleFactoryFromScheduler(sch)
|
factory := ruleFactoryFromScheduler(sch)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
ruleInfo := factory.new(ctx)
|
ruleInfo := factory.new(ctx, rule)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
_ = ruleInfo.Run(rule.GetKey())
|
_ = ruleInfo.Run(rule.GetKey())
|
||||||
|
54
pkg/services/ngalert/schedule/recording_rule.go
Normal file
54
pkg/services/ngalert/schedule/recording_rule.go
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
package schedule
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
|
"github.com/grafana/grafana/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type recordingRule struct {
|
||||||
|
ctx context.Context
|
||||||
|
stopFn util.CancelCauseFunc
|
||||||
|
|
||||||
|
logger log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRecordingRule(parent context.Context, logger log.Logger) *recordingRule {
|
||||||
|
ctx, stop := util.WithCancelCause(parent)
|
||||||
|
return &recordingRule{
|
||||||
|
ctx: ctx,
|
||||||
|
stopFn: stop,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recordingRule) Eval(eval *Evaluation) (bool, *Evaluation) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recordingRule) Update(lastVersion RuleVersionAndPauseStatus) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recordingRule) Stop(reason error) {
|
||||||
|
if r.stopFn != nil {
|
||||||
|
r.stopFn(reason)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recordingRule) Run(key ngmodels.AlertRuleKey) error {
|
||||||
|
ctx := ngmodels.WithRuleKey(r.ctx, key)
|
||||||
|
logger := r.logger.FromContext(ctx)
|
||||||
|
logger.Debug("Recording rule routine started")
|
||||||
|
|
||||||
|
// nolint:gosimple
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
logger.Debug("Stopping recording rule routine")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -18,7 +18,7 @@ import (
|
|||||||
var errRuleDeleted = errors.New("rule deleted")
|
var errRuleDeleted = errors.New("rule deleted")
|
||||||
|
|
||||||
type ruleFactory interface {
|
type ruleFactory interface {
|
||||||
new(context.Context) Rule
|
new(context.Context, *models.AlertRule) Rule
|
||||||
}
|
}
|
||||||
|
|
||||||
type ruleRegistry struct {
|
type ruleRegistry struct {
|
||||||
@ -30,15 +30,16 @@ func newRuleRegistry() ruleRegistry {
|
|||||||
return ruleRegistry{rules: make(map[models.AlertRuleKey]Rule)}
|
return ruleRegistry{rules: make(map[models.AlertRuleKey]Rule)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOrCreate gets rule routine from registry by the key. If it does not exist, it creates a new one.
|
// getOrCreate gets a rule routine from registry for the provided rule. If it does not exist, it creates a new one.
|
||||||
// Returns a pointer to the rule routine and a flag that indicates whether it is a new struct or not.
|
// Returns a pointer to the rule routine and a flag that indicates whether it is a new struct or not.
|
||||||
func (r *ruleRegistry) getOrCreate(context context.Context, key models.AlertRuleKey, factory ruleFactory) (Rule, bool) {
|
func (r *ruleRegistry) getOrCreate(context context.Context, item *models.AlertRule, factory ruleFactory) (Rule, bool) {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
defer r.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
key := item.GetKey()
|
||||||
rule, ok := r.rules[key]
|
rule, ok := r.rules[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
rule = factory.new(context)
|
rule = factory.new(context, item)
|
||||||
r.rules[key] = rule
|
r.rules[key] = rule
|
||||||
}
|
}
|
||||||
return rule, !ok
|
return rule, !ok
|
||||||
|
@ -253,9 +253,10 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
|||||||
sch.stopAppliedFunc,
|
sch.stopAppliedFunc,
|
||||||
)
|
)
|
||||||
for _, item := range alertRules {
|
for _, item := range alertRules {
|
||||||
|
ruleRoutine, newRoutine := sch.registry.getOrCreate(ctx, item, ruleFactory)
|
||||||
key := item.GetKey()
|
key := item.GetKey()
|
||||||
ruleRoutine, newRoutine := sch.registry.getOrCreate(ctx, key, ruleFactory)
|
|
||||||
logger := sch.log.FromContext(ctx).New(key.LogContext()...)
|
logger := sch.log.FromContext(ctx).New(key.LogContext()...)
|
||||||
|
|
||||||
// enforce minimum evaluation interval
|
// enforce minimum evaluation interval
|
||||||
if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) {
|
if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) {
|
||||||
logger.Debug("Interval adjusted", "originalInterval", item.IntervalSeconds, "adjustedInterval", sch.minRuleInterval.Seconds())
|
logger.Debug("Interval adjusted", "originalInterval", item.IntervalSeconds, "adjustedInterval", sch.minRuleInterval.Seconds())
|
||||||
|
@ -405,7 +405,7 @@ func TestSchedule_deleteAlertRule(t *testing.T) {
|
|||||||
ruleFactory := ruleFactoryFromScheduler(sch)
|
ruleFactory := ruleFactoryFromScheduler(sch)
|
||||||
rule := models.RuleGen.GenerateRef()
|
rule := models.RuleGen.GenerateRef()
|
||||||
key := rule.GetKey()
|
key := rule.GetKey()
|
||||||
info, _ := sch.registry.getOrCreate(context.Background(), key, ruleFactory)
|
info, _ := sch.registry.getOrCreate(context.Background(), rule, ruleFactory)
|
||||||
sch.deleteAlertRule(key)
|
sch.deleteAlertRule(key)
|
||||||
require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
|
require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
|
||||||
require.False(t, sch.registry.exists(key))
|
require.False(t, sch.registry.exists(key))
|
||||||
|
Loading…
Reference in New Issue
Block a user