Alerting: Scheduler and registry handle rules by an interface (#84044)

* export Evaluation

* Export Evaluation

* Export RuleVersionAndPauseStatus

* export Eval, create interface

* Export update and add to interface

* Export Stop and Run and add to interface

* Registry and scheduler use rule by interface and not concrete type

* Update factory to use interface, update tests to work over public API rather than writing to channels directly

* Rename map in registry

* Rename getOrCreateInfo to not reference a specific implementation

* Genericize alertRuleInfoRegistry into ruleRegistry

* Rename alertRuleInfo to alertRule

* Comments on interface

* Update pkg/services/ngalert/schedule/schedule.go

Co-authored-by: Jean-Philippe Quéméner <JohnnyQQQQ@users.noreply.github.com>

---------

Co-authored-by: Jean-Philippe Quéméner <JohnnyQQQQ@users.noreply.github.com>
This commit is contained in:
Alexander Weaver
2024-03-11 15:57:38 -05:00
committed by GitHub
parent 0b2640e9ff
commit 6c5e94095d
6 changed files with 142 additions and 124 deletions

View File

@@ -23,9 +23,22 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
type ruleFactoryFunc func(context.Context) *alertRuleInfo // Rule represents a single piece of work that is executed periodically by the ruler.
type Rule interface {
// Run creates the resources that will perform the rule's work, and starts it. It blocks indefinitely, until Stop is called or another signal is sent.
Run(key ngmodels.AlertRuleKey) error
// Stop shuts down the rule's execution with an optional reason. It has no effect if the rule has not yet been Run.
Stop(reason error)
// Eval sends a signal to execute the work represented by the rule, exactly one time.
// It has no effect if the rule has not yet been Run, or if the rule is Stopped.
Eval(eval *Evaluation) (bool, *Evaluation)
// Update sends a singal to change the definition of the rule.
Update(lastVersion RuleVersionAndPauseStatus) bool
}
func (f ruleFactoryFunc) new(ctx context.Context) *alertRuleInfo { type ruleFactoryFunc func(context.Context) Rule
func (f ruleFactoryFunc) new(ctx context.Context) Rule {
return f(ctx) return f(ctx)
} }
@@ -44,8 +57,8 @@ func newRuleFactory(
evalAppliedHook evalAppliedFunc, evalAppliedHook evalAppliedFunc,
stopAppliedHook stopAppliedFunc, stopAppliedHook stopAppliedFunc,
) ruleFactoryFunc { ) ruleFactoryFunc {
return func(ctx context.Context) *alertRuleInfo { return func(ctx context.Context) Rule {
return newAlertRuleInfo( return newAlertRule(
ctx, ctx,
appURL, appURL,
disableGrafanaFolder, disableGrafanaFolder,
@@ -71,9 +84,9 @@ type ruleProvider interface {
get(ngmodels.AlertRuleKey) *ngmodels.AlertRule get(ngmodels.AlertRuleKey) *ngmodels.AlertRule
} }
type alertRuleInfo struct { type alertRule struct {
evalCh chan *evaluation evalCh chan *Evaluation
updateCh chan ruleVersionAndPauseStatus updateCh chan RuleVersionAndPauseStatus
ctx context.Context ctx context.Context
stopFn util.CancelCauseFunc stopFn util.CancelCauseFunc
@@ -96,7 +109,7 @@ type alertRuleInfo struct {
tracer tracing.Tracer tracer tracing.Tracer
} }
func newAlertRuleInfo( func newAlertRule(
parent context.Context, parent context.Context,
appURL *url.URL, appURL *url.URL,
disableGrafanaFolder bool, disableGrafanaFolder bool,
@@ -111,11 +124,11 @@ func newAlertRuleInfo(
tracer tracing.Tracer, tracer tracing.Tracer,
evalAppliedHook func(ngmodels.AlertRuleKey, time.Time), evalAppliedHook func(ngmodels.AlertRuleKey, time.Time),
stopAppliedHook func(ngmodels.AlertRuleKey), stopAppliedHook func(ngmodels.AlertRuleKey),
) *alertRuleInfo { ) *alertRule {
ctx, stop := util.WithCancelCause(parent) ctx, stop := util.WithCancelCause(parent)
return &alertRuleInfo{ return &alertRule{
evalCh: make(chan *evaluation), evalCh: make(chan *Evaluation),
updateCh: make(chan ruleVersionAndPauseStatus), updateCh: make(chan RuleVersionAndPauseStatus),
ctx: ctx, ctx: ctx,
stopFn: stop, stopFn: stop,
appURL: appURL, appURL: appURL,
@@ -141,9 +154,9 @@ func newAlertRuleInfo(
// - false when the send operation is stopped // - false when the send operation is stopped
// //
// the second element contains a dropped message that was sent by a concurrent sender. // the second element contains a dropped message that was sent by a concurrent sender.
func (a *alertRuleInfo) eval(eval *evaluation) (bool, *evaluation) { func (a *alertRule) Eval(eval *Evaluation) (bool, *Evaluation) {
// read the channel in unblocking manner to make sure that there is no concurrent send operation. // read the channel in unblocking manner to make sure that there is no concurrent send operation.
var droppedMsg *evaluation var droppedMsg *Evaluation
select { select {
case droppedMsg = <-a.evalCh: case droppedMsg = <-a.evalCh:
default: default:
@@ -158,7 +171,7 @@ func (a *alertRuleInfo) eval(eval *evaluation) (bool, *evaluation) {
} }
// update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen. // update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen.
func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool { func (a *alertRule) Update(lastVersion RuleVersionAndPauseStatus) bool {
// check if the channel is not empty. // check if the channel is not empty.
select { select {
case <-a.updateCh: case <-a.updateCh:
@@ -176,11 +189,13 @@ func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool {
} }
// stop sends an instruction to the rule evaluation routine to shut down. an optional shutdown reason can be given. // stop sends an instruction to the rule evaluation routine to shut down. an optional shutdown reason can be given.
func (a *alertRuleInfo) stop(reason error) { func (a *alertRule) Stop(reason error) {
a.stopFn(reason) if a.stopFn != nil {
a.stopFn(reason)
}
} }
func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error { func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
grafanaCtx := ngmodels.WithRuleKey(a.ctx, key) grafanaCtx := ngmodels.WithRuleKey(a.ctx, key)
logger := a.logger.FromContext(grafanaCtx) logger := a.logger.FromContext(grafanaCtx)
logger.Debug("Alert rule routine started") logger.Debug("Alert rule routine started")
@@ -295,7 +310,7 @@ func (a *alertRuleInfo) run(key ngmodels.AlertRuleKey) error {
} }
} }
func (a *alertRuleInfo) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f fingerprint, attempt int64, e *evaluation, span trace.Span, retry bool) error { func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f fingerprint, attempt int64, e *Evaluation, span trace.Span, retry bool) error {
orgID := fmt.Sprint(key.OrgID) orgID := fmt.Sprint(key.OrgID)
evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID) evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID)
evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID) evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID)
@@ -393,14 +408,14 @@ func (a *alertRuleInfo) evaluate(ctx context.Context, key ngmodels.AlertRuleKey,
return nil return nil
} }
func (a *alertRuleInfo) notify(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) { func (a *alertRule) notify(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) {
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock) expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
if len(expiredAlerts.PostableAlerts) > 0 { if len(expiredAlerts.PostableAlerts) > 0 {
a.sender.Send(ctx, key, expiredAlerts) a.sender.Send(ctx, key, expiredAlerts)
} }
} }
func (a *alertRuleInfo) resetState(ctx context.Context, key ngmodels.AlertRuleKey, isPaused bool) { func (a *alertRule) resetState(ctx context.Context, key ngmodels.AlertRuleKey, isPaused bool) {
rule := a.ruleProvider.get(key) rule := a.ruleProvider.get(key)
reason := ngmodels.StateReasonUpdated reason := ngmodels.StateReasonUpdated
if isPaused { if isPaused {
@@ -411,7 +426,7 @@ func (a *alertRuleInfo) resetState(ctx context.Context, key ngmodels.AlertRuleKe
} }
// evalApplied is only used on tests. // evalApplied is only used on tests.
func (a *alertRuleInfo) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) { func (a *alertRule) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) {
if a.evalAppliedHook == nil { if a.evalAppliedHook == nil {
return return
} }
@@ -420,7 +435,7 @@ func (a *alertRuleInfo) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.
} }
// stopApplied is only used on tests. // stopApplied is only used on tests.
func (a *alertRuleInfo) stopApplied(alertDefKey ngmodels.AlertRuleKey) { func (a *alertRule) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
if a.stopAppliedHook == nil { if a.stopAppliedHook == nil {
return return
} }

View File

@@ -26,18 +26,18 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAlertRuleInfo(t *testing.T) { func TestAlertRule(t *testing.T) {
type evalResponse struct { type evalResponse struct {
success bool success bool
droppedEval *evaluation droppedEval *Evaluation
} }
t.Run("when rule evaluation is not stopped", func(t *testing.T) { t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("update should send to updateCh", func(t *testing.T) { t.Run("update should send to updateCh", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
resultCh := make(chan bool) resultCh := make(chan bool)
go func() { go func() {
resultCh <- r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) resultCh <- r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
}() }()
select { select {
case <-r.updateCh: case <-r.updateCh:
@@ -47,22 +47,22 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) { t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
version1 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false} version1 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version2 := ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false} version2 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
wg.Done() wg.Done()
r.update(version1) r.Update(version1)
wg.Done() wg.Done()
}() }()
wg.Wait() wg.Wait()
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() { go func() {
wg.Done() wg.Done()
r.update(version2) r.Update(version2)
}() }()
wg.Wait() // at this point tick 1 has already been dropped wg.Wait() // at this point tick 1 has already been dropped
select { select {
@@ -73,16 +73,16 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("eval should send to evalCh", func(t *testing.T) { t.Run("eval should send to evalCh", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
expected := time.Now() expected := time.Now()
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
data := &evaluation{ data := &Evaluation{
scheduledAt: expected, scheduledAt: expected,
rule: models.AlertRuleGen()(), rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(), folderTitle: util.GenerateShortUID(),
} }
go func() { go func() {
result, dropped := r.eval(data) result, dropped := r.Eval(data)
resultCh <- evalResponse{result, dropped} resultCh <- evalResponse{result, dropped}
}() }()
select { select {
@@ -96,17 +96,17 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) { t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
time1 := time.UnixMilli(rand.Int63n(math.MaxInt64)) time1 := time.UnixMilli(rand.Int63n(math.MaxInt64))
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64)) time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
resultCh1 := make(chan evalResponse) resultCh1 := make(chan evalResponse)
resultCh2 := make(chan evalResponse) resultCh2 := make(chan evalResponse)
data := &evaluation{ data := &Evaluation{
scheduledAt: time1, scheduledAt: time1,
rule: models.AlertRuleGen()(), rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(), folderTitle: util.GenerateShortUID(),
} }
data2 := &evaluation{ data2 := &Evaluation{
scheduledAt: time2, scheduledAt: time2,
rule: data.rule, rule: data.rule,
folderTitle: data.folderTitle, folderTitle: data.folderTitle,
@@ -115,7 +115,7 @@ func TestAlertRuleInfo(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
wg.Done() wg.Done()
result, dropped := r.eval(data) result, dropped := r.Eval(data)
wg.Done() wg.Done()
resultCh1 <- evalResponse{result, dropped} resultCh1 <- evalResponse{result, dropped}
}() }()
@@ -123,7 +123,7 @@ func TestAlertRuleInfo(t *testing.T) {
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() { go func() {
wg.Done() wg.Done()
result, dropped := r.eval(data2) result, dropped := r.Eval(data2)
resultCh2 <- evalResponse{result, dropped} resultCh2 <- evalResponse{result, dropped}
}() }()
wg.Wait() // at this point tick 1 has already been dropped wg.Wait() // at this point tick 1 has already been dropped
@@ -142,19 +142,19 @@ func TestAlertRuleInfo(t *testing.T) {
} }
}) })
t.Run("eval should exit when context is cancelled", func(t *testing.T) { t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
data := &evaluation{ data := &Evaluation{
scheduledAt: time.Now(), scheduledAt: time.Now(),
rule: models.AlertRuleGen()(), rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(), folderTitle: util.GenerateShortUID(),
} }
go func() { go func() {
result, dropped := r.eval(data) result, dropped := r.Eval(data)
resultCh <- evalResponse{result, dropped} resultCh <- evalResponse{result, dropped}
}() }()
runtime.Gosched() runtime.Gosched()
r.stop(nil) r.Stop(nil)
select { select {
case result := <-resultCh: case result := <-resultCh:
require.False(t, result.success) require.False(t, result.success)
@@ -166,37 +166,37 @@ func TestAlertRuleInfo(t *testing.T) {
}) })
t.Run("when rule evaluation is stopped", func(t *testing.T) { t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) { t.Run("Update should do nothing", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
r.stop(errRuleDeleted) r.Stop(errRuleDeleted)
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted) require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
require.False(t, r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})) require.False(t, r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}))
}) })
t.Run("eval should do nothing", func(t *testing.T) { t.Run("eval should do nothing", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
r.stop(nil) r.Stop(nil)
data := &evaluation{ data := &Evaluation{
scheduledAt: time.Now(), scheduledAt: time.Now(),
rule: models.AlertRuleGen()(), rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(), folderTitle: util.GenerateShortUID(),
} }
success, dropped := r.eval(data) success, dropped := r.Eval(data)
require.False(t, success) require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.Nilf(t, dropped, "expected no dropped evaluations but got one")
}) })
t.Run("stop should do nothing", func(t *testing.T) { t.Run("stop should do nothing", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
r.stop(nil) r.Stop(nil)
r.stop(nil) r.Stop(nil)
}) })
t.Run("stop should do nothing if parent context stopped", func(t *testing.T) { t.Run("stop should do nothing if parent context stopped", func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background()) ctx, cancelFn := context.WithCancel(context.Background())
r := blankRuleInfoForTests(ctx) r := blankRuleForTests(ctx)
cancelFn() cancelFn()
r.stop(nil) r.Stop(nil)
}) })
}) })
t.Run("should be thread-safe", func(t *testing.T) { t.Run("should be thread-safe", func(t *testing.T) {
r := blankRuleInfoForTests(context.Background()) r := blankRuleForTests(context.Background())
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
go func() { go func() {
for { for {
@@ -221,15 +221,15 @@ func TestAlertRuleInfo(t *testing.T) {
} }
switch rand.Intn(max) + 1 { switch rand.Intn(max) + 1 {
case 1: case 1:
r.update(ruleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}) r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
case 2: case 2:
r.eval(&evaluation{ r.Eval(&Evaluation{
scheduledAt: time.Now(), scheduledAt: time.Now(),
rule: models.AlertRuleGen()(), rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(), folderTitle: util.GenerateShortUID(),
}) })
case 3: case 3:
r.stop(nil) r.Stop(nil)
} }
} }
wg.Done() wg.Done()
@@ -240,9 +240,8 @@ func TestAlertRuleInfo(t *testing.T) {
}) })
} }
func blankRuleInfoForTests(ctx context.Context) *alertRuleInfo { func blankRuleForTests(ctx context.Context) *alertRule {
factory := newRuleFactory(nil, false, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) return newAlertRule(context.Background(), nil, false, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
return factory.new(context.Background())
} }
func TestRuleRoutine(t *testing.T) { func TestRuleRoutine(t *testing.T) {
@@ -279,16 +278,16 @@ func TestRuleRoutine(t *testing.T) {
t.Cleanup(cancel) t.Cleanup(cancel)
ruleInfo := factory.new(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.run(rule.GetKey()) _ = ruleInfo.Run(rule.GetKey())
}() }()
expectedTime := time.UnixMicro(rand.Int63()) expectedTime := time.UnixMicro(rand.Int63())
ruleInfo.evalCh <- &evaluation{ ruleInfo.Eval(&Evaluation{
scheduledAt: expectedTime, scheduledAt: expectedTime,
rule: rule, rule: rule,
folderTitle: folderTitle, folderTitle: folderTitle,
} })
actualTime := waitForTimeChannel(t, evalAppliedChan) actualTime := waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime) require.Equal(t, expectedTime, actualTime)
@@ -428,7 +427,7 @@ func TestRuleRoutine(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ruleInfo := factory.new(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
err := ruleInfo.run(models.AlertRuleKey{}) err := ruleInfo.Run(models.AlertRuleKey{})
stoppedChan <- err stoppedChan <- err
}() }()
@@ -448,11 +447,11 @@ func TestRuleRoutine(t *testing.T) {
factory := ruleFactoryFromScheduler(sch) factory := ruleFactoryFromScheduler(sch)
ruleInfo := factory.new(context.Background()) ruleInfo := factory.new(context.Background())
go func() { go func() {
err := ruleInfo.run(rule.GetKey()) err := ruleInfo.Run(rule.GetKey())
stoppedChan <- err stoppedChan <- err
}() }()
ruleInfo.stop(errRuleDeleted) ruleInfo.Stop(errRuleDeleted)
err := waitForErrChannel(t, stoppedChan) err := waitForErrChannel(t, stoppedChan)
require.NoError(t, err) require.NoError(t, err)
@@ -479,15 +478,15 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.run(rule.GetKey()) _ = ruleInfo.Run(rule.GetKey())
}() }()
// init evaluation loop so it got the rule version // init evaluation loop so it got the rule version
ruleInfo.evalCh <- &evaluation{ ruleInfo.Eval(&Evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
folderTitle: folderTitle, folderTitle: folderTitle,
} })
waitForTimeChannel(t, evalAppliedChan) waitForTimeChannel(t, evalAppliedChan)
@@ -519,8 +518,8 @@ func TestRuleRoutine(t *testing.T) {
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired") require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
t.Run("should do nothing if version in channel is the same", func(t *testing.T) { t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false} ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp, false})
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp, false} // second time just to make sure that previous messages were handled ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp, false}) // second time just to make sure that previous messages were handled
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, actualStates, len(states)) require.Len(t, actualStates, len(states))
@@ -529,7 +528,7 @@ func TestRuleRoutine(t *testing.T) {
}) })
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) { t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
ruleInfo.updateCh <- ruleVersionAndPauseStatus{ruleFp + 1, false} ruleInfo.Update(RuleVersionAndPauseStatus{ruleFp + 1, false})
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
return len(sender.Calls()) > 0 return len(sender.Calls()) > 0
@@ -561,13 +560,13 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.run(rule.GetKey()) _ = ruleInfo.Run(rule.GetKey())
}() }()
ruleInfo.evalCh <- &evaluation{ ruleInfo.Eval(&Evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
} })
waitForTimeChannel(t, evalAppliedChan) waitForTimeChannel(t, evalAppliedChan)
@@ -667,13 +666,13 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.run(rule.GetKey()) _ = ruleInfo.Run(rule.GetKey())
}() }()
ruleInfo.evalCh <- &evaluation{ ruleInfo.Eval(&Evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
} })
waitForTimeChannel(t, evalAppliedChan) waitForTimeChannel(t, evalAppliedChan)
@@ -701,13 +700,13 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx) ruleInfo := factory.new(ctx)
go func() { go func() {
_ = ruleInfo.run(rule.GetKey()) _ = ruleInfo.Run(rule.GetKey())
}() }()
ruleInfo.evalCh <- &evaluation{ ruleInfo.Eval(&Evaluation{
scheduledAt: sch.clock.Now(), scheduledAt: sch.clock.Now(),
rule: rule, rule: rule,
} })
waitForTimeChannel(t, evalAppliedChan) waitForTimeChannel(t, evalAppliedChan)

View File

@@ -10,7 +10,7 @@ import (
var _ eval.AlertingResultsReader = AlertingResultsFromRuleState{} var _ eval.AlertingResultsReader = AlertingResultsFromRuleState{}
func (a *alertRuleInfo) newLoadedMetricsReader(rule *ngmodels.AlertRule) eval.AlertingResultsReader { func (a *alertRule) newLoadedMetricsReader(rule *ngmodels.AlertRule) eval.AlertingResultsReader {
return &AlertingResultsFromRuleState{ return &AlertingResultsFromRuleState{
Manager: a.stateManager, Manager: a.stateManager,
Rule: rule, Rule: rule,

View File

@@ -18,65 +18,69 @@ import (
var errRuleDeleted = errors.New("rule deleted") var errRuleDeleted = errors.New("rule deleted")
type ruleFactory interface { type ruleFactory interface {
new(context.Context) *alertRuleInfo new(context.Context) Rule
} }
type alertRuleInfoRegistry struct { type ruleRegistry struct {
mu sync.Mutex mu sync.Mutex
alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo rules map[models.AlertRuleKey]Rule
} }
// getOrCreateInfo gets rule routine information from registry by the key. If it does not exist, it creates a new one. func newRuleRegistry() ruleRegistry {
// Returns a pointer to the rule routine information and a flag that indicates whether it is a new struct or not. return ruleRegistry{rules: make(map[models.AlertRuleKey]Rule)}
func (r *alertRuleInfoRegistry) getOrCreateInfo(context context.Context, key models.AlertRuleKey, factory ruleFactory) (*alertRuleInfo, bool) { }
// getOrCreate gets rule routine from registry by the key. If it does not exist, it creates a new one.
// Returns a pointer to the rule routine and a flag that indicates whether it is a new struct or not.
func (r *ruleRegistry) getOrCreate(context context.Context, key models.AlertRuleKey, factory ruleFactory) (Rule, bool) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key] rule, ok := r.rules[key]
if !ok { if !ok {
info = factory.new(context) rule = factory.new(context)
r.alertRuleInfo[key] = info r.rules[key] = rule
} }
return info, !ok return rule, !ok
} }
func (r *alertRuleInfoRegistry) exists(key models.AlertRuleKey) bool { func (r *ruleRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
_, ok := r.alertRuleInfo[key] _, ok := r.rules[key]
return ok return ok
} }
// del removes pair that has specific key from alertRuleInfo. // del removes pair that has specific key from the registry.
// Returns 2-tuple where the first element is value of the removed pair // Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed. // and the second element indicates whether element with the specified key existed.
func (r *alertRuleInfoRegistry) del(key models.AlertRuleKey) (*alertRuleInfo, bool) { func (r *ruleRegistry) del(key models.AlertRuleKey) (Rule, bool) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
info, ok := r.alertRuleInfo[key] rule, ok := r.rules[key]
if ok { if ok {
delete(r.alertRuleInfo, key) delete(r.rules, key)
} }
return info, ok return rule, ok
} }
func (r *alertRuleInfoRegistry) keyMap() map[models.AlertRuleKey]struct{} { func (r *ruleRegistry) keyMap() map[models.AlertRuleKey]struct{} {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
definitionsIDs := make(map[models.AlertRuleKey]struct{}, len(r.alertRuleInfo)) definitionsIDs := make(map[models.AlertRuleKey]struct{}, len(r.rules))
for k := range r.alertRuleInfo { for k := range r.rules {
definitionsIDs[k] = struct{}{} definitionsIDs[k] = struct{}{}
} }
return definitionsIDs return definitionsIDs
} }
type ruleVersionAndPauseStatus struct { type RuleVersionAndPauseStatus struct {
Fingerprint fingerprint Fingerprint fingerprint
IsPaused bool IsPaused bool
} }
type evaluation struct { type Evaluation struct {
scheduledAt time.Time scheduledAt time.Time
rule *models.AlertRule rule *models.AlertRule
folderTitle string folderTitle string

View File

@@ -47,8 +47,8 @@ type schedule struct {
// base tick rate (fastest possible configured check) // base tick rate (fastest possible configured check)
baseInterval time.Duration baseInterval time.Duration
// each alert rule gets its own channel and routine // each rule gets its own channel and routine
registry alertRuleInfoRegistry registry ruleRegistry
maxAttempts int64 maxAttempts int64
@@ -116,7 +116,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule {
} }
sch := schedule{ sch := schedule{
registry: alertRuleInfoRegistry{alertRuleInfo: make(map[ngmodels.AlertRuleKey]*alertRuleInfo)}, registry: newRuleRegistry(),
maxAttempts: cfg.MaxAttempts, maxAttempts: cfg.MaxAttempts,
clock: cfg.C, clock: cfg.C,
baseInterval: cfg.BaseInterval, baseInterval: cfg.BaseInterval,
@@ -165,13 +165,13 @@ func (sch *schedule) deleteAlertRule(keys ...ngmodels.AlertRuleKey) {
sch.log.Info("Alert rule cannot be removed from the scheduler as it is not scheduled", key.LogContext()...) sch.log.Info("Alert rule cannot be removed from the scheduler as it is not scheduled", key.LogContext()...)
} }
// Delete the rule routine // Delete the rule routine
ruleInfo, ok := sch.registry.del(key) ruleRoutine, ok := sch.registry.del(key)
if !ok { if !ok {
sch.log.Info("Alert rule cannot be stopped as it is not running", key.LogContext()...) sch.log.Info("Alert rule cannot be stopped as it is not running", key.LogContext()...)
continue continue
} }
// stop rule evaluation // stop rule evaluation
ruleInfo.stop(errRuleDeleted) ruleRoutine.Stop(errRuleDeleted)
} }
// Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick. // Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick.
alertRules, _ := sch.schedulableAlertRules.all() alertRules, _ := sch.schedulableAlertRules.all()
@@ -202,8 +202,8 @@ func (sch *schedule) schedulePeriodic(ctx context.Context, t *ticker.T) error {
} }
type readyToRunItem struct { type readyToRunItem struct {
ruleInfo *alertRuleInfo ruleRoutine Rule
evaluation Evaluation
} }
// TODO refactor to accept a callback for tests that will be called with things that are returned currently, and return nothing. // TODO refactor to accept a callback for tests that will be called with things that are returned currently, and return nothing.
@@ -252,7 +252,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
) )
for _, item := range alertRules { for _, item := range alertRules {
key := item.GetKey() key := item.GetKey()
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key, ruleFactory) ruleRoutine, newRoutine := sch.registry.getOrCreate(ctx, key, ruleFactory)
// enforce minimum evaluation interval // enforce minimum evaluation interval
if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) { if item.IntervalSeconds < int64(sch.minRuleInterval.Seconds()) {
@@ -264,7 +264,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
if newRoutine && !invalidInterval { if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error { dispatcherGroup.Go(func() error {
return ruleInfo.run(key) return ruleRoutine.Run(key)
}) })
} }
@@ -291,7 +291,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
if isReadyToRun { if isReadyToRun {
sch.log.Debug("Rule is ready to run on the current tick", "uid", item.UID, "tick", tickNum, "frequency", itemFrequency, "offset", offset) sch.log.Debug("Rule is ready to run on the current tick", "uid", item.UID, "tick", tickNum, "frequency", itemFrequency, "offset", offset)
readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, evaluation: evaluation{ readyToRun = append(readyToRun, readyToRunItem{ruleRoutine: ruleRoutine, Evaluation: Evaluation{
scheduledAt: tick, scheduledAt: tick,
rule: item, rule: item,
folderTitle: folderTitle, folderTitle: folderTitle,
@@ -300,12 +300,12 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
if _, isUpdated := updated[key]; isUpdated && !isReadyToRun { if _, isUpdated := updated[key]; isUpdated && !isReadyToRun {
// if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that // if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that
sch.log.Debug("Rule has been updated. Notifying evaluation routine", key.LogContext()...) sch.log.Debug("Rule has been updated. Notifying evaluation routine", key.LogContext()...)
go func(ri *alertRuleInfo, rule *ngmodels.AlertRule) { go func(routine Rule, rule *ngmodels.AlertRule) {
ri.update(ruleVersionAndPauseStatus{ routine.Update(RuleVersionAndPauseStatus{
Fingerprint: ruleWithFolder{rule: rule, folderTitle: folderTitle}.Fingerprint(), Fingerprint: ruleWithFolder{rule: rule, folderTitle: folderTitle}.Fingerprint(),
IsPaused: rule.IsPaused, IsPaused: rule.IsPaused,
}) })
}(ruleInfo, item) }(ruleRoutine, item)
updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{ updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{
Version: item.Version, Version: item.Version,
AlertRuleKey: item.GetKey(), AlertRuleKey: item.GetKey(),
@@ -330,7 +330,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
time.AfterFunc(time.Duration(int64(i)*step), func() { time.AfterFunc(time.Duration(int64(i)*step), func() {
key := item.rule.GetKey() key := item.rule.GetKey()
success, dropped := item.ruleInfo.eval(&item.evaluation) success, dropped := item.ruleRoutine.Eval(&item.Evaluation)
if !success { if !success {
sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", append(key.LogContext(), "time", tick)...) sch.log.Debug("Scheduled evaluation was canceled because evaluation routine was stopped", append(key.LogContext(), "time", tick)...)
return return

View File

@@ -363,9 +363,9 @@ func TestSchedule_deleteAlertRule(t *testing.T) {
ruleFactory := ruleFactoryFromScheduler(sch) ruleFactory := ruleFactoryFromScheduler(sch)
rule := models.AlertRuleGen()() rule := models.AlertRuleGen()()
key := rule.GetKey() key := rule.GetKey()
info, _ := sch.registry.getOrCreateInfo(context.Background(), key, ruleFactory) info, _ := sch.registry.getOrCreate(context.Background(), key, ruleFactory)
sch.deleteAlertRule(key) sch.deleteAlertRule(key)
require.ErrorIs(t, info.ctx.Err(), errRuleDeleted) require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
require.False(t, sch.registry.exists(key)) require.False(t, sch.registry.exists(key))
}) })
}) })