mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Optional function to find the rule deletion reason (#99422)
This commit is contained in:
parent
84da64462e
commit
12bda63871
@ -345,8 +345,9 @@ func (a *alertRule) Run() error {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
case <-grafanaCtx.Done():
|
case <-grafanaCtx.Done():
|
||||||
|
reason := grafanaCtx.Err()
|
||||||
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
|
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
|
||||||
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
|
if errors.Is(reason, errRuleDeleted) {
|
||||||
// We do not want a context to be unbounded which could potentially cause a go routine running
|
// We do not want a context to be unbounded which could potentially cause a go routine running
|
||||||
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
|
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
|
||||||
// cases.
|
// cases.
|
||||||
@ -355,7 +356,7 @@ func (a *alertRule) Run() error {
|
|||||||
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key.AlertRuleKey), a.key, ngmodels.StateReasonRuleDeleted)
|
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key.AlertRuleKey), a.key, ngmodels.StateReasonRuleDeleted)
|
||||||
a.expireAndSend(grafanaCtx, states)
|
a.expireAndSend(grafanaCtx, states)
|
||||||
}
|
}
|
||||||
a.logger.Debug("Stopping alert rule routine")
|
a.logger.Debug("Stopping alert rule routine", "reason", reason)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -283,7 +283,7 @@ func TestRuleRoutine(t *testing.T) {
|
|||||||
instanceStore := &state.FakeInstanceStore{}
|
instanceStore := &state.FakeInstanceStore{}
|
||||||
|
|
||||||
registry := prometheus.NewPedanticRegistry()
|
registry := prometheus.NewPedanticRegistry()
|
||||||
sch := setupScheduler(t, ruleStore, instanceStore, registry, senderMock, nil)
|
sch := setupScheduler(t, ruleStore, instanceStore, registry, senderMock, nil, nil)
|
||||||
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
|
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
|
||||||
evalAppliedChan <- t
|
evalAppliedChan <- t
|
||||||
}
|
}
|
||||||
|
@ -163,7 +163,7 @@ func TestRecordingRule_Integration(t *testing.T) {
|
|||||||
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules(), models.RuleGen.WithOrgID(123))
|
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules(), models.RuleGen.WithOrgID(123))
|
||||||
ruleStore := newFakeRulesStore()
|
ruleStore := newFakeRulesStore()
|
||||||
reg := prometheus.NewPedanticRegistry()
|
reg := prometheus.NewPedanticRegistry()
|
||||||
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil)
|
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil, nil)
|
||||||
writeTarget := writer.NewTestRemoteWriteTarget(t)
|
writeTarget := writer.NewTestRemoteWriteTarget(t)
|
||||||
defer writeTarget.Close()
|
defer writeTarget.Close()
|
||||||
writerReg := prometheus.NewPedanticRegistry()
|
writerReg := prometheus.NewPedanticRegistry()
|
||||||
|
@ -52,6 +52,15 @@ type RecordingWriter interface {
|
|||||||
Write(ctx context.Context, name string, t time.Time, frames data.Frames, orgID int64, extraLabels map[string]string) error
|
Write(ctx context.Context, name string, t time.Time, frames data.Frames, orgID int64, extraLabels map[string]string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AlertRuleStopReasonProvider is an interface for determining the reason why an alert rule was stopped.
|
||||||
|
type AlertRuleStopReasonProvider interface {
|
||||||
|
// FindReason returns two values:
|
||||||
|
// 1. The first value is the reason for stopping the alert rule (error type).
|
||||||
|
// 2. The second value is an error indicating any issues that occurred while determining the stop reason.
|
||||||
|
// If this is non-nil, the scheduler uses the default reason.
|
||||||
|
FindReason(ctx context.Context, logger log.Logger, key ngmodels.AlertRuleKeyWithGroup) (error, error)
|
||||||
|
}
|
||||||
|
|
||||||
type schedule struct {
|
type schedule struct {
|
||||||
// base tick rate (fastest possible configured check)
|
// base tick rate (fastest possible configured check)
|
||||||
baseInterval time.Duration
|
baseInterval time.Duration
|
||||||
@ -73,6 +82,8 @@ type schedule struct {
|
|||||||
// message from stopApplied is handled.
|
// message from stopApplied is handled.
|
||||||
stopAppliedFunc func(ngmodels.AlertRuleKey)
|
stopAppliedFunc func(ngmodels.AlertRuleKey)
|
||||||
|
|
||||||
|
ruleStopReasonProvider AlertRuleStopReasonProvider
|
||||||
|
|
||||||
log log.Logger
|
log log.Logger
|
||||||
|
|
||||||
evaluatorFactory eval.EvaluatorFactory
|
evaluatorFactory eval.EvaluatorFactory
|
||||||
@ -119,6 +130,7 @@ type SchedulerCfg struct {
|
|||||||
Tracer tracing.Tracer
|
Tracer tracing.Tracer
|
||||||
Log log.Logger
|
Log log.Logger
|
||||||
RecordingWriter RecordingWriter
|
RecordingWriter RecordingWriter
|
||||||
|
RuleStopReasonProvider AlertRuleStopReasonProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScheduler returns a new scheduler.
|
// NewScheduler returns a new scheduler.
|
||||||
@ -148,6 +160,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule {
|
|||||||
alertsSender: cfg.AlertSender,
|
alertsSender: cfg.AlertSender,
|
||||||
tracer: cfg.Tracer,
|
tracer: cfg.Tracer,
|
||||||
recordingWriter: cfg.RecordingWriter,
|
recordingWriter: cfg.RecordingWriter,
|
||||||
|
ruleStopReasonProvider: cfg.RuleStopReasonProvider,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &sch
|
return &sch
|
||||||
@ -180,12 +193,13 @@ func (sch *schedule) Status(key ngmodels.AlertRuleKey) (ngmodels.RuleStatus, boo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// deleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
|
// deleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
|
||||||
func (sch *schedule) deleteAlertRule(keys ...ngmodels.AlertRuleKey) {
|
func (sch *schedule) deleteAlertRule(ctx context.Context, keys ...ngmodels.AlertRuleKey) {
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
// It can happen that the scheduler has deleted the alert rule before the
|
// It can happen that the scheduler has deleted the alert rule before the
|
||||||
// Ruler API has called DeleteAlertRule. This can happen as requests to
|
// Ruler API has called DeleteAlertRule. This can happen as requests to
|
||||||
// the Ruler API do not hold an exclusive lock over all scheduler operations.
|
// the Ruler API do not hold an exclusive lock over all scheduler operations.
|
||||||
if _, ok := sch.schedulableAlertRules.del(key); !ok {
|
rule, ok := sch.schedulableAlertRules.del(key)
|
||||||
|
if !ok {
|
||||||
sch.log.Info("Alert rule cannot be removed from the scheduler as it is not scheduled", key.LogContext()...)
|
sch.log.Info("Alert rule cannot be removed from the scheduler as it is not scheduled", key.LogContext()...)
|
||||||
}
|
}
|
||||||
// Delete the rule routine
|
// Delete the rule routine
|
||||||
@ -194,14 +208,35 @@ func (sch *schedule) deleteAlertRule(keys ...ngmodels.AlertRuleKey) {
|
|||||||
sch.log.Info("Alert rule cannot be stopped as it is not running", key.LogContext()...)
|
sch.log.Info("Alert rule cannot be stopped as it is not running", key.LogContext()...)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop rule evaluation
|
// stop rule evaluation
|
||||||
ruleRoutine.Stop(errRuleDeleted)
|
reason := sch.getRuleStopReason(ctx, key, rule)
|
||||||
|
ruleRoutine.Stop(reason)
|
||||||
}
|
}
|
||||||
// Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick.
|
// Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick.
|
||||||
alertRules, _ := sch.schedulableAlertRules.all()
|
alertRules, _ := sch.schedulableAlertRules.all()
|
||||||
sch.updateRulesMetrics(alertRules)
|
sch.updateRulesMetrics(alertRules)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sch *schedule) getRuleStopReason(ctx context.Context, key ngmodels.AlertRuleKey, rule *ngmodels.AlertRule) error {
|
||||||
|
// If the ruleStopReasonProvider is defined, we will use it to get the reason why the
|
||||||
|
// alert rule was stopped. If it returns an error, we will use the default reason.
|
||||||
|
if sch.ruleStopReasonProvider == nil || rule == nil {
|
||||||
|
return errRuleDeleted
|
||||||
|
}
|
||||||
|
|
||||||
|
stopReason, err := sch.ruleStopReasonProvider.FindReason(ctx, sch.log, rule.GetKeyWithGroup())
|
||||||
|
if err != nil {
|
||||||
|
sch.log.New(key.LogContext()...).Error("Failed to get stop reason", "error", err)
|
||||||
|
return errRuleDeleted
|
||||||
|
}
|
||||||
|
if stopReason == nil {
|
||||||
|
return errRuleDeleted
|
||||||
|
}
|
||||||
|
|
||||||
|
return stopReason
|
||||||
|
}
|
||||||
|
|
||||||
func (sch *schedule) schedulePeriodic(ctx context.Context, t *ticker.T) error {
|
func (sch *schedule) schedulePeriodic(ctx context.Context, t *ticker.T) error {
|
||||||
dispatcherGroup, ctx := errgroup.WithContext(ctx)
|
dispatcherGroup, ctx := errgroup.WithContext(ctx)
|
||||||
for {
|
for {
|
||||||
@ -392,6 +427,6 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
|||||||
for key := range registeredDefinitions {
|
for key := range registeredDefinitions {
|
||||||
toDelete = append(toDelete, key)
|
toDelete = append(toDelete, key)
|
||||||
}
|
}
|
||||||
sch.deleteAlertRule(toDelete...)
|
sch.deleteAlertRule(ctx, toDelete...)
|
||||||
return readyToRun, registeredDefinitions, updatedRules
|
return readyToRun, registeredDefinitions, updatedRules
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/url"
|
"net/url"
|
||||||
@ -558,7 +559,7 @@ func TestProcessTicks(t *testing.T) {
|
|||||||
func TestSchedule_updateRulesMetrics(t *testing.T) {
|
func TestSchedule_updateRulesMetrics(t *testing.T) {
|
||||||
ruleStore := newFakeRulesStore()
|
ruleStore := newFakeRulesStore()
|
||||||
reg := prometheus.NewPedanticRegistry()
|
reg := prometheus.NewPedanticRegistry()
|
||||||
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil)
|
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil, nil)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
const firstOrgID int64 = 1
|
const firstOrgID int64 = 1
|
||||||
|
|
||||||
@ -916,29 +917,114 @@ func TestSchedule_updateRulesMetrics(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockAlertRuleStopReasonProvider struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAlertRuleStopReasonProvider) FindReason(ctx context.Context, logger log.Logger, key models.AlertRuleKeyWithGroup) (error, error) {
|
||||||
|
args := m.Called(ctx, logger, key)
|
||||||
|
return args.Error(0), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSchedule_deleteAlertRule(t *testing.T) {
|
func TestSchedule_deleteAlertRule(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
t.Run("when rule exists", func(t *testing.T) {
|
t.Run("when rule exists", func(t *testing.T) {
|
||||||
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
|
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
|
||||||
sch := setupScheduler(t, nil, nil, nil, nil, nil)
|
ruleStore := newFakeRulesStore()
|
||||||
|
sch := setupScheduler(t, ruleStore, nil, nil, nil, nil, nil)
|
||||||
|
ruleFactory := ruleFactoryFromScheduler(sch)
|
||||||
|
rule := models.RuleGen.GenerateRef()
|
||||||
|
ruleStore.PutRule(ctx, rule)
|
||||||
|
key := rule.GetKey()
|
||||||
|
info, _ := sch.registry.getOrCreate(ctx, rule, ruleFactory)
|
||||||
|
|
||||||
|
sch.deleteAlertRule(ctx, key)
|
||||||
|
|
||||||
|
require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
|
||||||
|
require.False(t, sch.registry.exists(key))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("it should call ruleStopReasonProvider if it is defined", func(t *testing.T) {
|
||||||
|
mockReasonProvider := new(mockAlertRuleStopReasonProvider)
|
||||||
|
expectedReason := errors.New("some rule deletion reason")
|
||||||
|
mockReasonProvider.On("FindReason", mock.Anything, mock.Anything, mock.Anything).Return(expectedReason, nil)
|
||||||
|
|
||||||
|
ruleStore := newFakeRulesStore()
|
||||||
|
sch := setupScheduler(t, ruleStore, nil, nil, nil, nil, mockReasonProvider)
|
||||||
|
ruleFactory := ruleFactoryFromScheduler(sch)
|
||||||
|
rule := models.RuleGen.GenerateRef()
|
||||||
|
ruleStore.PutRule(ctx, rule)
|
||||||
|
key := rule.GetKey()
|
||||||
|
info, _ := sch.registry.getOrCreate(ctx, rule, ruleFactory)
|
||||||
|
|
||||||
|
_, err := sch.updateSchedulableAlertRules(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sch.deleteAlertRule(ctx, key)
|
||||||
|
|
||||||
|
mockReasonProvider.AssertCalled(t, "FindReason", mock.Anything, mock.Anything, rule.GetKeyWithGroup())
|
||||||
|
|
||||||
|
require.ErrorIs(t, info.(*alertRule).ctx.Err(), expectedReason)
|
||||||
|
require.False(t, sch.registry.exists(key))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("it should use the default reason if ruleStopReasonProvider does not return anything", func(t *testing.T) {
|
||||||
|
mockReasonProvider := new(mockAlertRuleStopReasonProvider)
|
||||||
|
mockReasonProvider.On("FindReason", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||||
|
|
||||||
|
ruleStore := newFakeRulesStore()
|
||||||
|
sch := setupScheduler(t, ruleStore, nil, nil, nil, nil, mockReasonProvider)
|
||||||
|
ruleFactory := ruleFactoryFromScheduler(sch)
|
||||||
|
rule := models.RuleGen.GenerateRef()
|
||||||
|
ruleStore.PutRule(ctx, rule)
|
||||||
|
key := rule.GetKey()
|
||||||
|
info, _ := sch.registry.getOrCreate(ctx, rule, ruleFactory)
|
||||||
|
|
||||||
|
_, err := sch.updateSchedulableAlertRules(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sch.deleteAlertRule(ctx, key)
|
||||||
|
|
||||||
|
mockReasonProvider.AssertCalled(t, "FindReason", mock.Anything, mock.Anything, rule.GetKeyWithGroup())
|
||||||
|
|
||||||
|
require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
|
||||||
|
require.False(t, sch.registry.exists(key))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("it should not call ruleStopReasonProvider if the rule is not found in the registry", func(t *testing.T) {
|
||||||
|
mockReasonProvider := new(mockAlertRuleStopReasonProvider)
|
||||||
|
expectedReason := errors.New("some rule deletion reason")
|
||||||
|
mockReasonProvider.On("FindReason", mock.Anything, mock.Anything, mock.Anything).Return(expectedReason, nil)
|
||||||
|
|
||||||
|
// Don't create a ruleStore so that the rule will not be found in deleteAlertRule
|
||||||
|
sch := setupScheduler(t, nil, nil, nil, nil, nil, mockReasonProvider)
|
||||||
ruleFactory := ruleFactoryFromScheduler(sch)
|
ruleFactory := ruleFactoryFromScheduler(sch)
|
||||||
rule := models.RuleGen.GenerateRef()
|
rule := models.RuleGen.GenerateRef()
|
||||||
key := rule.GetKey()
|
key := rule.GetKey()
|
||||||
info, _ := sch.registry.getOrCreate(context.Background(), rule, ruleFactory)
|
info, _ := sch.registry.getOrCreate(ctx, rule, ruleFactory)
|
||||||
sch.deleteAlertRule(key)
|
|
||||||
|
_, err := sch.updateSchedulableAlertRules(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sch.deleteAlertRule(ctx, key)
|
||||||
|
|
||||||
|
mockReasonProvider.AssertNotCalled(t, "FindReason")
|
||||||
|
|
||||||
require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
|
require.ErrorIs(t, info.(*alertRule).ctx.Err(), errRuleDeleted)
|
||||||
require.False(t, sch.registry.exists(key))
|
require.False(t, sch.registry.exists(key))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("when rule does not exist", func(t *testing.T) {
|
t.Run("when rule does not exist", func(t *testing.T) {
|
||||||
t.Run("should exit", func(t *testing.T) {
|
t.Run("should exit", func(t *testing.T) {
|
||||||
sch := setupScheduler(t, nil, nil, nil, nil, nil)
|
sch := setupScheduler(t, nil, nil, nil, nil, nil, nil)
|
||||||
key := models.GenerateRuleKey(rand.Int63())
|
key := models.GenerateRuleKey(rand.Int63())
|
||||||
sch.deleteAlertRule(key)
|
sch.deleteAlertRule(ctx, key)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStore, registry *prometheus.Registry, senderMock *SyncAlertsSenderMock, evalMock eval.EvaluatorFactory) *schedule {
|
func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStore, registry *prometheus.Registry, senderMock *SyncAlertsSenderMock, evalMock eval.EvaluatorFactory, ruleStopReasonProvider AlertRuleStopReasonProvider) *schedule {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
testTracer := tracing.InitializeTracerForTest()
|
testTracer := tracing.InitializeTracerForTest()
|
||||||
|
|
||||||
@ -995,6 +1081,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
|||||||
Tracer: testTracer,
|
Tracer: testTracer,
|
||||||
Log: log.New("ngalert.scheduler"),
|
Log: log.New("ngalert.scheduler"),
|
||||||
RecordingWriter: fakeRecordingWriter,
|
RecordingWriter: fakeRecordingWriter,
|
||||||
|
RuleStopReasonProvider: ruleStopReasonProvider,
|
||||||
}
|
}
|
||||||
managerCfg := state.ManagerCfg{
|
managerCfg := state.ManagerCfg{
|
||||||
Metrics: m.GetStateMetrics(),
|
Metrics: m.GetStateMetrics(),
|
||||||
|
Loading…
Reference in New Issue
Block a user