Alerting: Refactor scheduler's rule evaluator to store rule key (#89925)

This commit is contained in:
Yuri Tseretyan 2024-07-01 16:43:23 -04:00 committed by GitHub
parent e4eebd6379
commit c3b5cabb14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 92 additions and 71 deletions

View File

@ -30,7 +30,7 @@ import (
// Rule represents a single piece of work that is executed periodically by the ruler.
type Rule interface {
// Run creates the resources that will perform the rule's work, and starts it. It blocks indefinitely, until Stop is called or another signal is sent.
Run(key ngmodels.AlertRuleKey) error
Run() error
// Stop shuts down the rule's execution with an optional reason. It has no effect if the rule has not yet been Run.
Stop(reason error)
// Eval sends a signal to execute the work represented by the rule, exactly one time.
@ -67,6 +67,7 @@ func newRuleFactory(
if rule.Type() == ngmodels.RuleTypeRecording {
return newRecordingRule(
ctx,
rule.GetKey(),
maxAttempts,
clock,
evalFactory,
@ -79,6 +80,7 @@ func newRuleFactory(
}
return newAlertRule(
ctx,
rule.GetKey(),
appURL,
disableGrafanaFolder,
maxAttempts,
@ -104,6 +106,8 @@ type ruleProvider interface {
}
type alertRule struct {
key ngmodels.AlertRuleKey
evalCh chan *Evaluation
updateCh chan RuleVersionAndPauseStatus
ctx context.Context
@ -130,6 +134,7 @@ type alertRule struct {
func newAlertRule(
parent context.Context,
key ngmodels.AlertRuleKey,
appURL *url.URL,
disableGrafanaFolder bool,
maxAttempts int64,
@ -144,8 +149,9 @@ func newAlertRule(
evalAppliedHook func(ngmodels.AlertRuleKey, time.Time),
stopAppliedHook func(ngmodels.AlertRuleKey),
) *alertRule {
ctx, stop := util.WithCancelCause(parent)
ctx, stop := util.WithCancelCause(ngmodels.WithRuleKey(parent, key))
return &alertRule{
key: key,
evalCh: make(chan *Evaluation),
updateCh: make(chan RuleVersionAndPauseStatus),
ctx: ctx,
@ -161,7 +167,7 @@ func newAlertRule(
evalAppliedHook: evalAppliedHook,
stopAppliedHook: stopAppliedHook,
metrics: met,
logger: logger,
logger: logger.FromContext(ctx),
tracer: tracer,
}
}
@ -174,6 +180,11 @@ func newAlertRule(
//
// the second element contains a dropped message that was sent by a concurrent sender.
func (a *alertRule) Eval(eval *Evaluation) (bool, *Evaluation) {
if a.key != eval.rule.GetKey() {
// Make sure that rule has the same key. This should not happen
a.logger.Error("Invalid rule sent for evaluating. Skipping", "ruleKeyToEvaluate", eval.rule.GetKey().String())
return false, eval
}
// read the channel in unblocking manner to make sure that there is no concurrent send operation.
var droppedMsg *Evaluation
select {
@ -214,13 +225,13 @@ func (a *alertRule) Stop(reason error) {
}
}
func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
grafanaCtx := ngmodels.WithRuleKey(a.ctx, key)
logger := a.logger.FromContext(grafanaCtx)
func (a *alertRule) Run() error {
grafanaCtx := a.ctx
logger := a.logger
logger.Debug("Alert rule routine started")
var currentFingerprint fingerprint
defer a.stopApplied(key)
defer a.stopApplied()
for {
select {
// used by external services (API) to notify that rule is updated.
@ -232,7 +243,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
logger.Info("Clearing the state of the rule because it was updated", "isPaused", ctx.IsPaused, "fingerprint", ctx.Fingerprint)
// clear the state. So the next evaluation will start from the scratch.
a.resetState(grafanaCtx, key, ctx.IsPaused)
a.resetState(grafanaCtx, ctx.IsPaused)
currentFingerprint = ctx.Fingerprint
// evalCh - used by the scheduler to signal that evaluation is needed.
case ctx, ok := <-a.evalCh:
@ -242,14 +253,14 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
}
func() {
orgID := fmt.Sprint(key.OrgID)
orgID := fmt.Sprint(a.key.OrgID)
evalDuration := a.metrics.EvalDuration.WithLabelValues(orgID)
evalTotal := a.metrics.EvalTotal.WithLabelValues(orgID)
evalStart := a.clock.Now()
defer func() {
evalDuration.Observe(a.clock.Now().Sub(evalStart).Seconds())
a.evalApplied(key, ctx.scheduledAt)
a.evalApplied(ctx.scheduledAt)
}()
for attempt := int64(1); attempt <= a.maxAttempts; attempt++ {
@ -266,7 +277,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
// lingers in DB and won't be cleaned up until next alert rule update.
needReset = needReset || (currentFingerprint == 0 && isPaused)
if needReset {
a.resetState(grafanaCtx, key, isPaused)
a.resetState(grafanaCtx, isPaused)
}
currentFingerprint = f
if isPaused {
@ -298,7 +309,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
}
retry := attempt < a.maxAttempts
err := a.evaluate(tracingCtx, key, f, attempt, ctx, span, retry)
err := a.evaluate(tracingCtx, f, attempt, ctx, span, retry)
// This is extremely confusing - when we exhaust all retry attempts, or we have no retryable errors
// we return nil - so technically, this is meaningless to know whether the evaluation has errors or not.
span.End()
@ -325,8 +336,8 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
// cases.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key, ngmodels.StateReasonRuleDeleted)
a.expireAndSend(grafanaCtx, key, states)
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key), a.key, ngmodels.StateReasonRuleDeleted)
a.expireAndSend(grafanaCtx, states)
}
logger.Debug("Stopping alert rule routine")
return nil
@ -334,8 +345,8 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
}
}
func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f fingerprint, attempt int64, e *Evaluation, span trace.Span, retry bool) error {
orgID := fmt.Sprint(key.OrgID)
func (a *alertRule) evaluate(ctx context.Context, f fingerprint, attempt int64, e *Evaluation, span trace.Span, retry bool) error {
orgID := fmt.Sprint(a.key.OrgID)
evalAttemptTotal := a.metrics.EvalAttemptTotal.WithLabelValues(orgID)
evalAttemptFailures := a.metrics.EvalAttemptFailures.WithLabelValues(orgID)
evalTotalFailures := a.metrics.EvalFailures.WithLabelValues(orgID)
@ -419,7 +430,7 @@ func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f f
state.GetRuleExtraLabels(logger, e.rule, e.folderTitle, !a.disableGrafanaFolder),
func(ctx context.Context, statesToSend state.StateTransitions) {
start := a.clock.Now()
alerts := a.send(ctx, key, statesToSend)
alerts := a.send(ctx, statesToSend)
span.AddEvent("results sent", trace.WithAttributes(
attribute.Int64("alerts_sent", int64(len(alerts.PostableAlerts))),
))
@ -432,52 +443,52 @@ func (a *alertRule) evaluate(ctx context.Context, key ngmodels.AlertRuleKey, f f
}
// send sends alerts for the given state transitions.
func (a *alertRule) send(ctx context.Context, key ngmodels.AlertRuleKey, states state.StateTransitions) definitions.PostableAlerts {
func (a *alertRule) send(ctx context.Context, states state.StateTransitions) definitions.PostableAlerts {
alerts := definitions.PostableAlerts{PostableAlerts: make([]models.PostableAlert, 0, len(states))}
for _, alertState := range states {
alerts.PostableAlerts = append(alerts.PostableAlerts, *state.StateToPostableAlert(alertState, a.appURL))
}
if len(alerts.PostableAlerts) > 0 {
a.sender.Send(ctx, key, alerts)
a.sender.Send(ctx, a.key, alerts)
}
return alerts
}
// sendExpire sends alerts to expire all previously firing alerts in the provided state transitions.
func (a *alertRule) expireAndSend(ctx context.Context, key ngmodels.AlertRuleKey, states []state.StateTransition) {
func (a *alertRule) expireAndSend(ctx context.Context, states []state.StateTransition) {
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
if len(expiredAlerts.PostableAlerts) > 0 {
a.sender.Send(ctx, key, expiredAlerts)
a.sender.Send(ctx, a.key, expiredAlerts)
}
}
func (a *alertRule) resetState(ctx context.Context, key ngmodels.AlertRuleKey, isPaused bool) {
rule := a.ruleProvider.get(key)
func (a *alertRule) resetState(ctx context.Context, isPaused bool) {
rule := a.ruleProvider.get(a.key)
reason := ngmodels.StateReasonUpdated
if isPaused {
reason = ngmodels.StateReasonPaused
}
states := a.stateManager.ResetStateByRuleUID(ctx, rule, reason)
a.expireAndSend(ctx, key, states)
a.expireAndSend(ctx, states)
}
// evalApplied is only used on tests.
func (a *alertRule) evalApplied(alertDefKey ngmodels.AlertRuleKey, now time.Time) {
func (a *alertRule) evalApplied(now time.Time) {
if a.evalAppliedHook == nil {
return
}
a.evalAppliedHook(alertDefKey, now)
a.evalAppliedHook(a.key, now)
}
// stopApplied is only used on tests.
func (a *alertRule) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
func (a *alertRule) stopApplied() {
if a.stopAppliedHook == nil {
return
}
a.stopAppliedHook(alertDefKey)
a.stopAppliedHook(a.key)
}
func SchedulerUserFor(orgID int64) *user.SignedInUser {

View File

@ -39,7 +39,7 @@ func TestAlertRule(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("update should send to updateCh", func(t *testing.T) {
r := blankRuleForTests(context.Background())
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
resultCh := make(chan bool)
go func() {
resultCh <- r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
@ -52,7 +52,7 @@ func TestAlertRule(t *testing.T) {
}
})
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := blankRuleForTests(context.Background())
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
version1 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version2 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
@ -78,12 +78,13 @@ func TestAlertRule(t *testing.T) {
}
})
t.Run("eval should send to evalCh", func(t *testing.T) {
r := blankRuleForTests(context.Background())
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
expected := time.Now()
resultCh := make(chan evalResponse)
data := &Evaluation{
scheduledAt: expected,
rule: gen.GenerateRef(),
rule: ruleSpec,
folderTitle: util.GenerateShortUID(),
}
go func() {
@ -101,14 +102,15 @@ func TestAlertRule(t *testing.T) {
}
})
t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) {
r := blankRuleForTests(context.Background())
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
time1 := time.UnixMilli(rand.Int63n(math.MaxInt64))
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
resultCh1 := make(chan evalResponse)
resultCh2 := make(chan evalResponse)
data := &Evaluation{
scheduledAt: time1,
rule: gen.GenerateRef(),
rule: ruleSpec,
folderTitle: util.GenerateShortUID(),
}
data2 := &Evaluation{
@ -147,11 +149,12 @@ func TestAlertRule(t *testing.T) {
}
})
t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := blankRuleForTests(context.Background())
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
resultCh := make(chan evalResponse)
data := &Evaluation{
scheduledAt: time.Now(),
rule: gen.GenerateRef(),
rule: ruleSpec,
folderTitle: util.GenerateShortUID(),
}
go func() {
@ -171,17 +174,18 @@ func TestAlertRule(t *testing.T) {
})
t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) {
r := blankRuleForTests(context.Background())
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r.Stop(errRuleDeleted)
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
require.False(t, r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}))
})
t.Run("eval should do nothing", func(t *testing.T) {
r := blankRuleForTests(context.Background())
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
r.Stop(nil)
data := &Evaluation{
scheduledAt: time.Now(),
rule: gen.GenerateRef(),
rule: ruleSpec,
folderTitle: util.GenerateShortUID(),
}
success, dropped := r.Eval(data)
@ -189,19 +193,19 @@ func TestAlertRule(t *testing.T) {
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
})
t.Run("calling stop multiple times should not panic", func(t *testing.T) {
r := blankRuleForTests(context.Background())
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r.Stop(nil)
r.Stop(nil)
})
t.Run("stop should not panic if parent context stopped", func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
r := blankRuleForTests(ctx)
r := blankRuleForTests(ctx, models.GenerateRuleKey(1))
cancelFn()
r.Stop(nil)
})
})
t.Run("should be thread-safe", func(t *testing.T) {
r := blankRuleForTests(context.Background())
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
wg := sync.WaitGroup{}
go func() {
for {
@ -245,10 +249,10 @@ func TestAlertRule(t *testing.T) {
})
t.Run("Run should exit if idle when Stop is called", func(t *testing.T) {
rule := blankRuleForTests(context.Background())
rule := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
runResult := make(chan error)
go func() {
runResult <- rule.Run(models.AlertRuleKey{})
runResult <- rule.Run()
}()
rule.Stop(nil)
@ -262,8 +266,8 @@ func TestAlertRule(t *testing.T) {
})
}
func blankRuleForTests(ctx context.Context) *alertRule {
return newAlertRule(context.Background(), nil, false, 0, nil, nil, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
func blankRuleForTests(ctx context.Context, key models.AlertRuleKey) *alertRule {
return newAlertRule(ctx, key, nil, false, 0, nil, nil, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
}
func TestRuleRoutine(t *testing.T) {
@ -301,7 +305,7 @@ func TestRuleRoutine(t *testing.T) {
t.Cleanup(cancel)
ruleInfo := factory.new(ctx, rule)
go func() {
_ = ruleInfo.Run(rule.GetKey())
_ = ruleInfo.Run()
}()
expectedTime := time.UnixMicro(rand.Int63())
@ -464,7 +468,7 @@ func TestRuleRoutine(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ruleInfo := factory.new(ctx, rule)
go func() {
err := ruleInfo.Run(models.AlertRuleKey{})
err := ruleInfo.Run()
stoppedChan <- err
}()
@ -484,7 +488,7 @@ func TestRuleRoutine(t *testing.T) {
factory := ruleFactoryFromScheduler(sch)
ruleInfo := factory.new(context.Background(), rule)
go func() {
err := ruleInfo.Run(rule.GetKey())
err := ruleInfo.Run()
stoppedChan <- err
}()
@ -515,7 +519,7 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx, rule)
go func() {
_ = ruleInfo.Run(rule.GetKey())
_ = ruleInfo.Run()
}()
// init evaluation loop so it got the rule version
@ -597,7 +601,7 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx, rule)
go func() {
_ = ruleInfo.Run(rule.GetKey())
_ = ruleInfo.Run()
}()
ruleInfo.Eval(&Evaluation{
@ -716,7 +720,7 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx, rule)
go func() {
_ = ruleInfo.Run(rule.GetKey())
_ = ruleInfo.Run()
}()
ruleInfo.Eval(&Evaluation{
@ -750,7 +754,7 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx, rule)
go func() {
_ = ruleInfo.Run(rule.GetKey())
_ = ruleInfo.Run()
}()
ruleInfo.Eval(&Evaluation{
@ -787,7 +791,7 @@ func TestRuleRoutine(t *testing.T) {
ruleInfo := factory.new(ctx, rule)
go func() {
_ = ruleInfo.Run(rule.GetKey())
_ = ruleInfo.Run()
}()
// Evaluate 10 times:

View File

@ -8,6 +8,11 @@ import (
"github.com/benbjohnson/clock"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/cmd/grafana-cli/logger"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
@ -15,12 +20,11 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/util"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
type recordingRule struct {
key ngmodels.AlertRuleKey
ctx context.Context
evalCh chan *Evaluation
stopFn util.CancelCauseFunc
@ -41,9 +45,10 @@ type recordingRule struct {
writer RecordingWriter
}
func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer, writer RecordingWriter) *recordingRule {
ctx, stop := util.WithCancelCause(parent)
func newRecordingRule(parent context.Context, key ngmodels.AlertRuleKey, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer, writer RecordingWriter) *recordingRule {
ctx, stop := util.WithCancelCause(ngmodels.WithRuleKey(parent, key))
return &recordingRule{
key: key,
ctx: ctx,
evalCh: make(chan *Evaluation),
stopFn: stop,
@ -51,7 +56,7 @@ func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clo
evalFactory: evalFactory,
featureToggles: ft,
maxAttempts: maxAttempts,
logger: logger,
logger: logger.FromContext(ctx),
metrics: metrics,
tracer: tracer,
writer: writer,
@ -84,9 +89,8 @@ func (r *recordingRule) Stop(reason error) {
}
}
func (r *recordingRule) Run(key ngmodels.AlertRuleKey) error {
ctx := ngmodels.WithRuleKey(r.ctx, key)
logger := r.logger.FromContext(ctx)
func (r *recordingRule) Run() error {
ctx := r.ctx
logger.Debug("Recording rule routine started")
for {
@ -251,7 +255,7 @@ func (r *recordingRule) evaluationDoneTestHook(ev *Evaluation) {
return
}
r.evalAppliedHook(ev.rule.GetKey(), ev.scheduledAt)
r.evalAppliedHook(r.key, ev.scheduledAt)
}
func (r *recordingRule) frameRef(refID string, resp *backend.QueryDataResponse) (data.Frames, error) {

View File

@ -9,14 +9,15 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
models "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/writer"
"github.com/grafana/grafana/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)
func TestRecordingRule(t *testing.T) {
@ -131,7 +132,7 @@ func TestRecordingRule(t *testing.T) {
rule := blankRecordingRuleForTests(context.Background())
runResult := make(chan error)
go func() {
runResult <- rule.Run(models.AlertRuleKey{})
runResult <- rule.Run()
}()
rule.Stop(nil)
@ -147,7 +148,7 @@ func TestRecordingRule(t *testing.T) {
func blankRecordingRuleForTests(ctx context.Context) *recordingRule {
ft := featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules)
return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil, writer.FakeWriter{})
return newRecordingRule(context.Background(), models.AlertRuleKey{}, 0, nil, nil, ft, log.NewNopLogger(), nil, nil, writer.FakeWriter{})
}
func TestRecordingRule_Integration(t *testing.T) {
@ -168,7 +169,7 @@ func TestRecordingRule_Integration(t *testing.T) {
now := time.Now()
go func() {
_ = process.Run(rule.GetKey())
_ = process.Run()
}()
process.Eval(&Evaluation{
scheduledAt: now,

View File

@ -12,6 +12,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
@ -282,7 +283,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
if newRoutine && !invalidInterval {
dispatcherGroup.Go(func() error {
return ruleRoutine.Run(key)
return ruleRoutine.Run()
})
}