mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Rule evaluation loop refactoring (#40238)
* move evaluation function out of loop * extract updateRule function * isolate alertRule change. update returns new rule and the evaluation accepts the rule as argument * extract retry loop into a function * add function wide log context. * refactor metrics + add tests + replace timeNow with schedule.clock
This commit is contained in:
parent
3da4e1cdcd
commit
8a88caaa71
@ -2,6 +2,7 @@ package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
@ -22,9 +23,6 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// timeNow makes it possible to test usage of time
|
||||
var timeNow = time.Now
|
||||
|
||||
// ScheduleService handles scheduling
|
||||
type ScheduleService interface {
|
||||
Run(context.Context) error
|
||||
@ -424,11 +422,100 @@ func (sch *schedule) ruleEvaluationLoop(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRuleKey, evalCh <-chan *evalContext, stopCh <-chan struct{}) error {
|
||||
sch.log.Debug("alert rule routine started", "key", key)
|
||||
logger := sch.log.New("uid", key.UID, "org", key.OrgID)
|
||||
logger.Debug("alert rule routine started")
|
||||
|
||||
orgID := fmt.Sprint(key.OrgID)
|
||||
evalTotal := sch.metrics.EvalTotal.WithLabelValues(orgID)
|
||||
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
|
||||
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
|
||||
|
||||
updateRule := func() (*models.AlertRule, error) {
|
||||
q := models.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID}
|
||||
err := sch.ruleStore.GetAlertRuleByUID(&q)
|
||||
if err != nil {
|
||||
logger.Error("failed to fetch alert rule", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
return q.Result, nil
|
||||
}
|
||||
|
||||
evaluate := func(alertRule *models.AlertRule, attempt int64, ctx *evalContext) error {
|
||||
logger := logger.New("version", alertRule.Version, "attempt", attempt, "now", ctx.now)
|
||||
start := sch.clock.Now()
|
||||
|
||||
condition := models.Condition{
|
||||
Condition: alertRule.Condition,
|
||||
OrgID: alertRule.OrgID,
|
||||
Data: alertRule.Data,
|
||||
}
|
||||
results, err := sch.evaluator.ConditionEval(&condition, ctx.now, sch.dataService)
|
||||
dur := sch.clock.Now().Sub(start)
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(dur.Seconds())
|
||||
if err != nil {
|
||||
evalTotalFailures.Inc()
|
||||
// consider saving alert instance on error
|
||||
logger.Error("failed to evaluate alert rule", "duration", dur, "err", err)
|
||||
return err
|
||||
}
|
||||
logger.Debug("alert rule evaluated", "results", results, "duration", dur)
|
||||
|
||||
processedStates := sch.stateManager.ProcessEvalResults(context.Background(), alertRule, results)
|
||||
sch.saveAlertStates(processedStates)
|
||||
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||
|
||||
if len(alerts.PostableAlerts) == 0 {
|
||||
logger.Debug("no alerts to put in the notifier")
|
||||
return nil
|
||||
}
|
||||
|
||||
var localNotifierExist, externalNotifierExist bool
|
||||
logger.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts)
|
||||
n, err := sch.multiOrgNotifier.AlertmanagerFor(alertRule.OrgID)
|
||||
if err == nil {
|
||||
localNotifierExist = true
|
||||
if err := n.PutAlerts(alerts); err != nil {
|
||||
logger.Error("failed to put alerts in the local notifier", "count", len(alerts.PostableAlerts), "err", err)
|
||||
}
|
||||
} else {
|
||||
if errors.Is(err, notifier.ErrNoAlertmanagerForOrg) {
|
||||
logger.Debug("local notifier was not found")
|
||||
} else {
|
||||
logger.Error("local notifier is not available", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Send alerts to external Alertmanager(s) if we have a sender for this organization.
|
||||
sch.sendersMtx.RLock()
|
||||
defer sch.sendersMtx.RUnlock()
|
||||
s, ok := sch.senders[alertRule.OrgID]
|
||||
if ok {
|
||||
logger.Debug("sending alerts to external notifier", "count", len(alerts.PostableAlerts))
|
||||
s.SendAlerts(alerts)
|
||||
externalNotifierExist = true
|
||||
}
|
||||
|
||||
if !localNotifierExist && !externalNotifierExist {
|
||||
logger.Error("no external or internal notifier - alerts not delivered!", "count", len(alerts.PostableAlerts))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
retryIfError := func(f func(attempt int64) error) error {
|
||||
var attempt int64
|
||||
var err error
|
||||
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
|
||||
err = f(attempt)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
evalRunning := false
|
||||
var attempt int64
|
||||
var alertRule *models.AlertRule
|
||||
var currentRule *models.AlertRule
|
||||
for {
|
||||
select {
|
||||
case ctx := <-evalCh:
|
||||
@ -436,73 +523,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
continue
|
||||
}
|
||||
|
||||
evaluate := func(attempt int64) error {
|
||||
start := timeNow()
|
||||
|
||||
// fetch latest alert rule version
|
||||
if alertRule == nil || alertRule.Version < ctx.version {
|
||||
q := models.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID}
|
||||
err := sch.ruleStore.GetAlertRuleByUID(&q)
|
||||
if err != nil {
|
||||
sch.log.Error("failed to fetch alert rule", "key", key)
|
||||
return err
|
||||
}
|
||||
alertRule = q.Result
|
||||
sch.log.Debug("new alert rule version fetched", "title", alertRule.Title, "key", key, "version", alertRule.Version)
|
||||
}
|
||||
|
||||
condition := models.Condition{
|
||||
Condition: alertRule.Condition,
|
||||
OrgID: alertRule.OrgID,
|
||||
Data: alertRule.Data,
|
||||
}
|
||||
results, err := sch.evaluator.ConditionEval(&condition, ctx.now, sch.dataService)
|
||||
var (
|
||||
end = timeNow()
|
||||
tenant = fmt.Sprint(alertRule.OrgID)
|
||||
dur = end.Sub(start).Seconds()
|
||||
)
|
||||
|
||||
sch.metrics.EvalTotal.WithLabelValues(tenant).Inc()
|
||||
sch.metrics.EvalDuration.WithLabelValues(tenant).Observe(dur)
|
||||
if err != nil {
|
||||
sch.metrics.EvalFailures.WithLabelValues(tenant).Inc()
|
||||
// consider saving alert instance on error
|
||||
sch.log.Error("failed to evaluate alert rule", "title", alertRule.Title,
|
||||
"key", key, "attempt", attempt, "now", ctx.now, "duration", end.Sub(start), "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
processedStates := sch.stateManager.ProcessEvalResults(context.Background(), alertRule, results)
|
||||
sch.saveAlertStates(processedStates)
|
||||
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
|
||||
|
||||
if len(alerts.PostableAlerts) == 0 {
|
||||
sch.log.Debug("no alerts to put in the notifier", "org", alertRule.OrgID)
|
||||
return nil
|
||||
}
|
||||
|
||||
sch.log.Debug("sending alerts to notifier", "count", len(alerts.PostableAlerts), "alerts", alerts.PostableAlerts, "org", alertRule.OrgID)
|
||||
n, err := sch.multiOrgNotifier.AlertmanagerFor(alertRule.OrgID)
|
||||
if err == nil {
|
||||
if err := n.PutAlerts(alerts); err != nil {
|
||||
sch.log.Error("failed to put alerts in the notifier", "count", len(alerts.PostableAlerts), "err", err)
|
||||
}
|
||||
} else {
|
||||
sch.log.Error("unable to lookup local notifier for this org - alerts not delivered", "org", alertRule.OrgID, "count", len(alerts.PostableAlerts), "err", err)
|
||||
}
|
||||
|
||||
// Send alerts to external Alertmanager(s) if we have a sender for this organization.
|
||||
sch.sendersMtx.RLock()
|
||||
defer sch.sendersMtx.RUnlock()
|
||||
s, ok := sch.senders[alertRule.OrgID]
|
||||
if ok {
|
||||
s.SendAlerts(alerts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func() {
|
||||
evalRunning = true
|
||||
defer func() {
|
||||
@ -510,16 +530,25 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key models.AlertRul
|
||||
sch.evalApplied(key, ctx.now)
|
||||
}()
|
||||
|
||||
for attempt = 0; attempt < sch.maxAttempts; attempt++ {
|
||||
err := evaluate(attempt)
|
||||
if err == nil {
|
||||
break
|
||||
err := retryIfError(func(attempt int64) error {
|
||||
// fetch latest alert rule version
|
||||
if currentRule == nil || currentRule.Version < ctx.version {
|
||||
newRule, err := updateRule()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentRule = newRule
|
||||
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
|
||||
}
|
||||
return evaluate(currentRule, attempt, ctx)
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("evaluation failed after all retries", "err", err)
|
||||
}
|
||||
}()
|
||||
case <-stopCh:
|
||||
sch.stopApplied(key)
|
||||
sch.log.Debug("stopping alert rule routine", "key", key)
|
||||
logger.Debug("stopping alert rule routine")
|
||||
// interrupt evaluation if it's running
|
||||
return nil
|
||||
case <-grafanaCtx.Done():
|
||||
|
@ -1,6 +1,7 @@
|
||||
package schedule
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -12,6 +13,7 @@ import (
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@ -43,7 +45,7 @@ func TestSendingToExternalAlertmanager(t *testing.T) {
|
||||
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
|
||||
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
|
||||
|
||||
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore)
|
||||
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
|
||||
|
||||
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
|
||||
// when the first alert triggers.
|
||||
@ -105,7 +107,7 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
cmd := store.UpdateAdminConfigurationCmd{AdminConfiguration: adminConfig}
|
||||
require.NoError(t, fakeAdminConfigStore.UpdateAdminConfiguration(cmd))
|
||||
|
||||
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore)
|
||||
sched, mockedClock := setupScheduler(t, fakeRuleStore, fakeInstanceStore, fakeAdminConfigStore, nil)
|
||||
|
||||
// Make sure we sync the configuration at least once before the evaluation happens to guarantee the sender is running
|
||||
// when the first alert triggers.
|
||||
@ -154,14 +156,14 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
// However, sometimes this does not happen.
|
||||
|
||||
// Create two alert rules with one second interval.
|
||||
//alertRuleOrgOne := CreateTestAlertRule(t, fakeRuleStore, 1, 1)
|
||||
//alertRuleOrgTwo := CreateTestAlertRule(t, fakeRuleStore, 1, 2)
|
||||
// alertRuleOrgOne := CreateTestAlertRule(t, fakeRuleStore, 1, 1)
|
||||
// alertRuleOrgTwo := CreateTestAlertRule(t, fakeRuleStore, 1, 2)
|
||||
// Eventually, our Alertmanager should have received at least two alerts.
|
||||
//var count int
|
||||
//require.Eventuallyf(t, func() bool {
|
||||
// var count int
|
||||
// require.Eventuallyf(t, func() bool {
|
||||
// count := fakeAM.AlertsCount()
|
||||
// return count == 2 && fakeAM.AlertNamesCompare([]string{alertRuleOrgOne.Title, alertRuleOrgTwo.Title})
|
||||
//}, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s' from org 1 or '%s' from org 2, the alert count was: %d", alertRuleOrgOne.Title, alertRuleOrgTwo.Title, count)
|
||||
// }, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s' from org 1 or '%s' from org 2, the alert count was: %d", alertRuleOrgOne.Title, alertRuleOrgTwo.Title, count)
|
||||
|
||||
// 2. Next, let's modify the configuration of an organization by adding an extra alertmanager.
|
||||
fakeAM2 := NewFakeExternalAlertmanager(t)
|
||||
@ -237,17 +239,17 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
|
||||
func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
createSchedule := func(
|
||||
evalAppliedChan chan time.Time,
|
||||
) (*schedule, *fakeRuleStore, *fakeInstanceStore, *fakeAdminConfigStore) {
|
||||
) (*schedule, *fakeRuleStore, *fakeInstanceStore, *fakeAdminConfigStore, prometheus.Gatherer) {
|
||||
ruleStore := newFakeRuleStore(t)
|
||||
instanceStore := &fakeInstanceStore{}
|
||||
adminConfigStore := newFakeAdminConfigStore(t)
|
||||
|
||||
sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore)
|
||||
|
||||
registry := prometheus.NewPedanticRegistry()
|
||||
sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore, registry)
|
||||
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
|
||||
evalAppliedChan <- t
|
||||
}
|
||||
return sch, ruleStore, instanceStore, adminConfigStore
|
||||
return sch, ruleStore, instanceStore, adminConfigStore, registry
|
||||
}
|
||||
|
||||
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
|
||||
@ -262,8 +264,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sch, ruleStore, instanceStore, _ := createSchedule(evalAppliedChan)
|
||||
sch, ruleStore, instanceStore, _, reg := createSchedule(evalAppliedChan)
|
||||
|
||||
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState)
|
||||
|
||||
@ -339,8 +340,25 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
|
||||
})
|
||||
t.Run("it reports metrics", func(t *testing.T) {
|
||||
// TODO fix it when we update the way we use metrics
|
||||
t.Skip()
|
||||
// duration metric has 0 values because of mocked clock that do not advance
|
||||
expectedMetric := fmt.Sprintf(
|
||||
`# HELP grafana_alerting_rule_evaluation_duration_seconds The duration for a rule to execute.
|
||||
# TYPE grafana_alerting_rule_evaluation_duration_seconds summary
|
||||
grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.5"} 0
|
||||
grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.9"} 0
|
||||
grafana_alerting_rule_evaluation_duration_seconds{org="%[1]d",quantile="0.99"} 0
|
||||
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
|
||||
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
|
||||
# TYPE grafana_alerting_rule_evaluation_failures_total counter
|
||||
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0
|
||||
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
||||
# TYPE grafana_alerting_rule_evaluations_total counter
|
||||
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
|
||||
`, rule.OrgID)
|
||||
|
||||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
})
|
||||
}
|
||||
@ -350,7 +368,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
stopChan := make(chan struct{})
|
||||
stoppedChan := make(chan error)
|
||||
|
||||
sch, _, _, _ := createSchedule(make(chan time.Time))
|
||||
sch, _, _, _, _ := createSchedule(make(chan time.Time))
|
||||
|
||||
go func() {
|
||||
err := sch.ruleRoutine(context.Background(), models.AlertRuleKey{}, make(chan *evalContext), stopChan)
|
||||
@ -364,7 +382,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
t.Run("when context is cancelled", func(t *testing.T) {
|
||||
stoppedChan := make(chan error)
|
||||
sch, _, _, _ := createSchedule(make(chan time.Time))
|
||||
sch, _, _, _, _ := createSchedule(make(chan time.Time))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
@ -382,7 +400,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan)
|
||||
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
|
||||
|
||||
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
|
||||
|
||||
@ -436,7 +454,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan)
|
||||
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
|
||||
|
||||
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
|
||||
|
||||
@ -520,7 +538,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
evalChan := make(chan *evalContext)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan)
|
||||
sch, ruleStore, _, _, _ := createSchedule(evalAppliedChan)
|
||||
sch.senders[orgID] = s
|
||||
// eval.Alerting makes state manager to create notifications for alertmanagers
|
||||
rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting)
|
||||
@ -553,12 +571,15 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore) (*schedule, *clock.Mock) {
|
||||
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore, registry *prometheus.Registry) (*schedule, *clock.Mock) {
|
||||
t.Helper()
|
||||
|
||||
mockedClock := clock.NewMock()
|
||||
logger := log.New("ngalert schedule test")
|
||||
m := metrics.NewNGAlert(prometheus.NewPedanticRegistry())
|
||||
if registry == nil {
|
||||
registry = prometheus.NewPedanticRegistry()
|
||||
}
|
||||
m := metrics.NewNGAlert(registry)
|
||||
decryptFn := ossencryption.ProvideService().GetDecryptedValue
|
||||
moa, err := notifier.NewMultiOrgAlertmanager(&setting.Cfg{}, ¬ifier.FakeConfigStore{}, ¬ifier.FakeOrgStore{}, ¬ifier.FakeKVStore{}, decryptFn, nil, log.New("testlogger"))
|
||||
require.NoError(t, err)
|
||||
|
Loading…
Reference in New Issue
Block a user