mirror of
https://github.com/grafana/grafana.git
synced 2024-11-25 10:20:29 -06:00
Alerting: Scheduled recording rules execute their queries (#88309)
* Basic eval flow * Wiring-up * fix * Extend todo * Start with tests * Include some relevant tests, skip ones that seem to have timing-based race conditions * Some tests, touch up linter and todo * Solve TODO * Add tracing * Tests to make sure an eval went through * Wire up feature toggles * Update pkg/services/ngalert/schedule/recording_rule.go Co-authored-by: Steve Simpson <steve.simpson@grafana.com> * Update pkg/services/ngalert/schedule/recording_rule_test.go Co-authored-by: Steve Simpson <steve.simpson@grafana.com> * Update pkg/services/ngalert/schedule/recording_rule_test.go Co-authored-by: Steve Simpson <steve.simpson@grafana.com> * Update pkg/services/ngalert/schedule/recording_rule_test.go Co-authored-by: Steve Simpson <steve.simpson@grafana.com> --------- Co-authored-by: Steve Simpson <steve.simpson@grafana.com>
This commit is contained in:
parent
910553df20
commit
b926b6336d
@ -298,6 +298,7 @@ func (ng *AlertNG) init() error {
|
||||
AppURL: appUrl,
|
||||
EvaluatorFactory: evalFactory,
|
||||
RuleStore: ng.store,
|
||||
FeatureToggles: ng.FeatureToggles,
|
||||
Metrics: ng.Metrics.GetSchedulerMetrics(),
|
||||
AlertSender: alertsRouter,
|
||||
Tracer: ng.tracer,
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
@ -51,6 +52,7 @@ func newRuleFactory(
|
||||
evalFactory eval.EvaluatorFactory,
|
||||
ruleProvider ruleProvider,
|
||||
clock clock.Clock,
|
||||
featureToggles featuremgmt.FeatureToggles,
|
||||
met *metrics.Scheduler,
|
||||
logger log.Logger,
|
||||
tracer tracing.Tracer,
|
||||
@ -59,7 +61,16 @@ func newRuleFactory(
|
||||
) ruleFactoryFunc {
|
||||
return func(ctx context.Context, rule *ngmodels.AlertRule) Rule {
|
||||
if rule.IsRecordingRule() {
|
||||
return newRecordingRule(ctx, logger)
|
||||
return newRecordingRule(
|
||||
ctx,
|
||||
maxAttempts,
|
||||
clock,
|
||||
evalFactory,
|
||||
featureToggles,
|
||||
logger,
|
||||
met,
|
||||
tracer,
|
||||
)
|
||||
}
|
||||
return newAlertRule(
|
||||
ctx,
|
||||
@ -244,7 +255,7 @@ func (a *alertRule) Run(key ngmodels.AlertRuleKey) error {
|
||||
|
||||
for attempt := int64(1); attempt <= a.maxAttempts; attempt++ {
|
||||
isPaused := ctx.rule.IsPaused
|
||||
f := ruleWithFolder{ctx.rule, ctx.folderTitle}.Fingerprint()
|
||||
f := ctx.Fingerprint()
|
||||
// Do not clean up state if the eval loop has just started.
|
||||
var needReset bool
|
||||
if currentFingerprint != 0 && currentFingerprint != f {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
definitions "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
@ -185,12 +186,12 @@ func TestAlertRule(t *testing.T) {
|
||||
require.False(t, success)
|
||||
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
|
||||
})
|
||||
t.Run("stop should do nothing", func(t *testing.T) {
|
||||
t.Run("calling stop multiple times should not panic", func(t *testing.T) {
|
||||
r := blankRuleForTests(context.Background())
|
||||
r.Stop(nil)
|
||||
r.Stop(nil)
|
||||
})
|
||||
t.Run("stop should do nothing if parent context stopped", func(t *testing.T) {
|
||||
t.Run("stop should not panic if parent context stopped", func(t *testing.T) {
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
r := blankRuleForTests(ctx)
|
||||
cancelFn()
|
||||
@ -240,10 +241,27 @@ func TestAlertRule(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
t.Run("Run should exit if idle when Stop is called", func(t *testing.T) {
|
||||
rule := blankRuleForTests(context.Background())
|
||||
runResult := make(chan error)
|
||||
go func() {
|
||||
runResult <- rule.Run(models.AlertRuleKey{})
|
||||
}()
|
||||
|
||||
rule.Stop(nil)
|
||||
|
||||
select {
|
||||
case err := <-runResult:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Run() never exited")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func blankRuleForTests(ctx context.Context) *alertRule {
|
||||
return newAlertRule(context.Background(), nil, false, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
return newAlertRule(context.Background(), nil, false, 0, nil, nil, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
|
||||
}
|
||||
|
||||
func TestRuleRoutine(t *testing.T) {
|
||||
@ -747,5 +765,5 @@ func TestRuleRoutine(t *testing.T) {
|
||||
}
|
||||
|
||||
func ruleFactoryFromScheduler(sch *schedule) ruleFactory {
|
||||
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc)
|
||||
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.featureToggles, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc)
|
||||
}
|
||||
|
@ -2,30 +2,72 @@ package schedule
|
||||
|
||||
import (
|
||||
context "context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type recordingRule struct {
|
||||
ctx context.Context
|
||||
evalCh chan *Evaluation
|
||||
stopFn util.CancelCauseFunc
|
||||
|
||||
logger log.Logger
|
||||
maxAttempts int64
|
||||
|
||||
clock clock.Clock
|
||||
evalFactory eval.EvaluatorFactory
|
||||
featureToggles featuremgmt.FeatureToggles
|
||||
|
||||
// Event hooks that are only used in tests.
|
||||
evalAppliedHook evalAppliedFunc
|
||||
|
||||
logger log.Logger
|
||||
metrics *metrics.Scheduler
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
func newRecordingRule(parent context.Context, logger log.Logger) *recordingRule {
|
||||
func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer) *recordingRule {
|
||||
ctx, stop := util.WithCancelCause(parent)
|
||||
return &recordingRule{
|
||||
ctx: ctx,
|
||||
stopFn: stop,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
evalCh: make(chan *Evaluation),
|
||||
stopFn: stop,
|
||||
clock: clock,
|
||||
evalFactory: evalFactory,
|
||||
featureToggles: ft,
|
||||
maxAttempts: maxAttempts,
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
tracer: tracer,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recordingRule) Eval(eval *Evaluation) (bool, *Evaluation) {
|
||||
return true, nil
|
||||
// read the channel in unblocking manner to make sure that there is no concurrent send operation.
|
||||
var droppedMsg *Evaluation
|
||||
select {
|
||||
case droppedMsg = <-r.evalCh:
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case r.evalCh <- eval:
|
||||
return true, droppedMsg
|
||||
case <-r.ctx.Done():
|
||||
return false, droppedMsg
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recordingRule) Update(lastVersion RuleVersionAndPauseStatus) bool {
|
||||
@ -43,12 +85,130 @@ func (r *recordingRule) Run(key ngmodels.AlertRuleKey) error {
|
||||
logger := r.logger.FromContext(ctx)
|
||||
logger.Debug("Recording rule routine started")
|
||||
|
||||
// nolint:gosimple
|
||||
for {
|
||||
select {
|
||||
case eval, ok := <-r.evalCh:
|
||||
if !ok {
|
||||
logger.Debug("Evaluation channel has been closed. Exiting")
|
||||
return nil
|
||||
}
|
||||
if !r.featureToggles.IsEnabled(ctx, featuremgmt.FlagGrafanaManagedRecordingRules) {
|
||||
logger.Warn("Recording rule scheduled but toggle is not enabled. Skipping")
|
||||
return nil
|
||||
}
|
||||
// TODO: Skipping the "evalRunning" guard that the alert rule routine does, because it seems to be dead code and impossible to hit.
|
||||
// TODO: Either implement me or remove from alert rules once investigated.
|
||||
|
||||
r.doEvaluate(ctx, eval)
|
||||
case <-ctx.Done():
|
||||
logger.Debug("Stopping recording rule routine")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recordingRule) doEvaluate(ctx context.Context, ev *Evaluation) {
|
||||
logger := r.logger.FromContext(ctx).New("now", ev.scheduledAt, "fingerprint", ev.Fingerprint())
|
||||
orgID := fmt.Sprint(ev.rule.OrgID)
|
||||
evalDuration := r.metrics.EvalDuration.WithLabelValues(orgID)
|
||||
evalTotal := r.metrics.EvalTotal.WithLabelValues(orgID)
|
||||
evalStart := r.clock.Now()
|
||||
|
||||
defer func() {
|
||||
evalTotal.Inc()
|
||||
evalDuration.Observe(r.clock.Now().Sub(evalStart).Seconds())
|
||||
r.evaluationDoneTestHook(ev)
|
||||
}()
|
||||
|
||||
if ev.rule.IsPaused {
|
||||
logger.Debug("Skip recording rule evaluation because it is paused")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, span := r.tracer.Start(ctx, "recording rule execution", trace.WithAttributes(
|
||||
attribute.String("rule_uid", ev.rule.UID),
|
||||
attribute.Int64("org_id", ev.rule.OrgID),
|
||||
attribute.Int64("rule_version", ev.rule.Version),
|
||||
attribute.String("rule_fingerprint", ev.Fingerprint().String()),
|
||||
attribute.String("tick", ev.scheduledAt.UTC().Format(time.RFC3339Nano)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
for attempt := int64(1); attempt <= r.maxAttempts; attempt++ {
|
||||
logger := logger.New("attempt", attempt)
|
||||
if ctx.Err() != nil {
|
||||
span.SetStatus(codes.Error, "rule evaluation cancelled")
|
||||
logger.Error("Skipping recording rule evaluation because context has been cancelled")
|
||||
return
|
||||
}
|
||||
|
||||
err := r.tryEvaluation(ctx, ev, logger)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Error("Failed to evaluate rule", "attempt", attempt, "error", err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Error("Context has been cancelled while backing off", "attempt", attempt)
|
||||
return
|
||||
case <-time.After(retryDelay):
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logger log.Logger) error {
|
||||
orgID := fmt.Sprint(ev.rule.OrgID)
|
||||
evalAttemptTotal := r.metrics.EvalAttemptTotal.WithLabelValues(orgID)
|
||||
evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID)
|
||||
evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID)
|
||||
|
||||
start := r.clock.Now()
|
||||
evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID))
|
||||
result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger)
|
||||
dur := r.clock.Now().Sub(start)
|
||||
|
||||
evalAttemptTotal.Inc()
|
||||
span := trace.SpanFromContext(ctx)
|
||||
|
||||
// TODO: In some cases, err can be nil but the dataframe itself contains embedded error frames. Parse these out like we do when evaluating alert rules.
|
||||
// TODO: (Maybe, refactor something in eval package so we can use shared code for this)
|
||||
if err != nil {
|
||||
evalAttemptFailures.Inc()
|
||||
// TODO: Only errors embedded in the frame can be considered retryable.
|
||||
// TODO: Since we are not handling these yet per the above TODO, we can blindly consider all errors to be non-retryable for now, and just exit.
|
||||
evalTotalFailures.Inc()
|
||||
span.SetStatus(codes.Error, "rule evaluation failed")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("Alert rule evaluated", "results", result, "duration", dur)
|
||||
span.AddEvent("rule evaluated", trace.WithAttributes(
|
||||
attribute.Int64("results", int64(len(result.Responses))),
|
||||
))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *recordingRule) buildAndExecutePipeline(ctx context.Context, evalCtx eval.EvaluationContext, ev *Evaluation, logger log.Logger) (*backend.QueryDataResponse, error) {
|
||||
start := r.clock.Now()
|
||||
evaluator, err := r.evalFactory.Create(evalCtx, ev.rule.GetEvalCondition())
|
||||
if err != nil {
|
||||
logger.Error("Failed to build rule evaluator", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
results, err := evaluator.EvaluateRaw(ctx, ev.scheduledAt)
|
||||
if err != nil {
|
||||
logger.Error("Failed to evaluate rule", "error", err, "duration", r.clock.Now().Sub(start))
|
||||
}
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (r *recordingRule) evaluationDoneTestHook(ev *Evaluation) {
|
||||
if r.evalAppliedHook == nil {
|
||||
return
|
||||
}
|
||||
|
||||
r.evalAppliedHook(ev.rule.GetKey(), ev.scheduledAt)
|
||||
}
|
||||
|
217
pkg/services/ngalert/schedule/recording_rule_test.go
Normal file
217
pkg/services/ngalert/schedule/recording_rule_test.go
Normal file
@ -0,0 +1,217 @@
|
||||
package schedule
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
context "context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRecordingRule(t *testing.T) {
|
||||
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules())
|
||||
// evalRetval carries the return value of Rule.Eval() calls.
|
||||
type evalRetval struct {
|
||||
success bool
|
||||
droppedEval *Evaluation
|
||||
}
|
||||
|
||||
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
|
||||
t.Run("eval should send to evalCh", func(t *testing.T) {
|
||||
r := blankRecordingRuleForTests(context.Background())
|
||||
expected := time.Now()
|
||||
resultCh := make(chan evalRetval)
|
||||
data := &Evaluation{
|
||||
scheduledAt: expected,
|
||||
rule: gen.GenerateRef(),
|
||||
folderTitle: util.GenerateShortUID(),
|
||||
}
|
||||
|
||||
go func() {
|
||||
result, dropped := r.Eval(data)
|
||||
resultCh <- evalRetval{result, dropped}
|
||||
}()
|
||||
|
||||
select {
|
||||
case ctx := <-r.evalCh:
|
||||
require.Equal(t, data, ctx)
|
||||
result := <-resultCh // blocks
|
||||
require.True(t, result.success)
|
||||
require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("when rule evaluation is stopped", func(t *testing.T) {
|
||||
t.Run("eval should do nothing", func(t *testing.T) {
|
||||
r := blankRecordingRuleForTests(context.Background())
|
||||
r.Stop(nil)
|
||||
ev := &Evaluation{
|
||||
scheduledAt: time.Now(),
|
||||
rule: gen.GenerateRef(),
|
||||
folderTitle: util.GenerateShortUID(),
|
||||
}
|
||||
|
||||
success, dropped := r.Eval(ev)
|
||||
|
||||
require.False(t, success)
|
||||
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
|
||||
})
|
||||
|
||||
t.Run("calling stop multiple times should not panic", func(t *testing.T) {
|
||||
r := blankRecordingRuleForTests(context.Background())
|
||||
r.Stop(nil)
|
||||
r.Stop(nil)
|
||||
})
|
||||
|
||||
t.Run("stop should not panic if parent context stopped", func(t *testing.T) {
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
r := blankRecordingRuleForTests(ctx)
|
||||
cancelFn()
|
||||
r.Stop(nil)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("eval should be thread-safe", func(t *testing.T) {
|
||||
r := blankRecordingRuleForTests(context.Background())
|
||||
wg := sync.WaitGroup{}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-r.evalCh:
|
||||
time.Sleep(time.Microsecond)
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for i := 0; i < 20; i++ {
|
||||
max := 3
|
||||
if i <= 10 {
|
||||
max = 2
|
||||
}
|
||||
switch rand.Intn(max) + 1 {
|
||||
case 1:
|
||||
r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
|
||||
case 2:
|
||||
r.Eval(&Evaluation{
|
||||
scheduledAt: time.Now(),
|
||||
rule: gen.GenerateRef(),
|
||||
folderTitle: util.GenerateShortUID(),
|
||||
})
|
||||
case 3:
|
||||
r.Stop(nil)
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
t.Run("Run should exit if idle when Stop is called", func(t *testing.T) {
|
||||
rule := blankRecordingRuleForTests(context.Background())
|
||||
runResult := make(chan error)
|
||||
go func() {
|
||||
runResult <- rule.Run(models.AlertRuleKey{})
|
||||
}()
|
||||
|
||||
rule.Stop(nil)
|
||||
|
||||
select {
|
||||
case err := <-runResult:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Run() never exited")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func blankRecordingRuleForTests(ctx context.Context) *recordingRule {
|
||||
ft := featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules)
|
||||
return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil)
|
||||
}
|
||||
|
||||
func TestRecordingRule_Integration(t *testing.T) {
|
||||
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules())
|
||||
ruleStore := newFakeRulesStore()
|
||||
reg := prometheus.NewPedanticRegistry()
|
||||
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil)
|
||||
rule := gen.GenerateRef()
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
|
||||
ruleFactory := ruleFactoryFromScheduler(sch)
|
||||
|
||||
process := ruleFactory.new(context.Background(), rule)
|
||||
evalDoneChan := make(chan time.Time)
|
||||
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
|
||||
evalDoneChan <- t
|
||||
}
|
||||
now := time.Now()
|
||||
|
||||
go func() {
|
||||
_ = process.Run(rule.GetKey())
|
||||
}()
|
||||
process.Eval(&Evaluation{
|
||||
scheduledAt: now,
|
||||
rule: rule,
|
||||
folderTitle: folderTitle,
|
||||
})
|
||||
_ = waitForTimeChannel(t, evalDoneChan)
|
||||
|
||||
t.Run("reports basic evaluation metrics", func(t *testing.T) {
|
||||
expectedMetric := fmt.Sprintf(
|
||||
`
|
||||
# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
||||
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
|
||||
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
|
||||
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
|
||||
# TYPE grafana_alerting_rule_evaluations_total counter
|
||||
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
|
||||
# HELP grafana_alerting_rule_evaluation_attempts_total The total number of rule evaluation attempts.
|
||||
# TYPE grafana_alerting_rule_evaluation_attempts_total counter
|
||||
grafana_alerting_rule_evaluation_attempts_total{org="%[1]d"} 1
|
||||
`,
|
||||
rule.OrgID,
|
||||
)
|
||||
|
||||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric),
|
||||
"grafana_alerting_rule_evaluation_duration_seconds",
|
||||
"grafana_alerting_rule_evaluations_total",
|
||||
"grafana_alerting_rule_evaluation_attempts_total",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
@ -87,6 +87,10 @@ type Evaluation struct {
|
||||
folderTitle string
|
||||
}
|
||||
|
||||
func (e *Evaluation) Fingerprint() fingerprint {
|
||||
return ruleWithFolder{e.rule, e.folderTitle}.Fingerprint()
|
||||
}
|
||||
|
||||
type alertRulesRegistry struct {
|
||||
rules map[models.AlertRuleKey]*models.AlertRule
|
||||
folderTitles map[models.FolderKey]string
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
@ -77,6 +78,7 @@ type schedule struct {
|
||||
appURL *url.URL
|
||||
disableGrafanaFolder bool
|
||||
jitterEvaluations JitterStrategy
|
||||
featureToggles featuremgmt.FeatureToggles
|
||||
|
||||
metrics *metrics.Scheduler
|
||||
|
||||
@ -99,6 +101,7 @@ type SchedulerCfg struct {
|
||||
C clock.Clock
|
||||
MinRuleInterval time.Duration
|
||||
DisableGrafanaFolder bool
|
||||
FeatureToggles featuremgmt.FeatureToggles
|
||||
AppURL *url.URL
|
||||
JitterEvaluations JitterStrategy
|
||||
EvaluatorFactory eval.EvaluatorFactory
|
||||
@ -129,6 +132,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule {
|
||||
appURL: cfg.AppURL,
|
||||
disableGrafanaFolder: cfg.DisableGrafanaFolder,
|
||||
jitterEvaluations: cfg.JitterEvaluations,
|
||||
featureToggles: cfg.FeatureToggles,
|
||||
stateManager: stateManager,
|
||||
minRuleInterval: cfg.MinRuleInterval,
|
||||
schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)},
|
||||
@ -246,6 +250,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
||||
sch.evaluatorFactory,
|
||||
&sch.schedulableAlertRules,
|
||||
sch.clock,
|
||||
sch.featureToggles,
|
||||
sch.metrics,
|
||||
sch.log,
|
||||
sch.tracer,
|
||||
|
@ -436,7 +436,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
|
||||
var evaluator = evalMock
|
||||
if evalMock == nil {
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{})
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, &datasources.FakeCacheService{}, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{})
|
||||
}
|
||||
|
||||
if registry == nil {
|
||||
@ -466,6 +466,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
AppURL: appUrl,
|
||||
EvaluatorFactory: evaluator,
|
||||
RuleStore: rs,
|
||||
FeatureToggles: featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules),
|
||||
Metrics: m.GetSchedulerMetrics(),
|
||||
AlertSender: senderMock,
|
||||
Tracer: testTracer,
|
||||
|
Loading…
Reference in New Issue
Block a user