From 418b077c59f0fd16208f1bb63f132ff6061b89ab Mon Sep 17 00:00:00 2001 From: Alexander Weaver Date: Thu, 18 Jul 2024 17:14:49 -0500 Subject: [PATCH] Alerting: Integration testing for recording rules including writes (#90390) * Add success case and tests for writer using metrics * Use testable version of clock * Assert a specific series was written * Fix linter * Fix manually constructed writer --- pkg/services/ngalert/ngalert.go | 6 +- .../ngalert/schedule/recording_rule_test.go | 182 +++++++++++++++++- pkg/services/ngalert/writer/prom.go | 8 +- pkg/services/ngalert/writer/prom_test.go | 2 + pkg/services/ngalert/writer/testing.go | 28 ++- 5 files changed, 214 insertions(+), 12 deletions(-) diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index c3b4f4c6253..3fe6fc36942 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -344,7 +344,7 @@ func (ng *AlertNG) init() error { evalFactory := eval.NewEvaluatorFactory(ng.Cfg.UnifiedAlerting, ng.DataSourceCache, ng.ExpressionService, ng.pluginsStore) - recordingWriter, err := createRecordingWriter(ng.FeatureToggles, ng.Cfg.UnifiedAlerting.RecordingRules, ng.httpClientProvider, ng.tracer, ng.Metrics.GetRemoteWriterMetrics()) + recordingWriter, err := createRecordingWriter(ng.FeatureToggles, ng.Cfg.UnifiedAlerting.RecordingRules, ng.httpClientProvider, clk, ng.tracer, ng.Metrics.GetRemoteWriterMetrics()) if err != nil { return fmt.Errorf("failed to initialize recording writer: %w", err) } @@ -644,11 +644,11 @@ func createRemoteAlertmanager(cfg remote.AlertmanagerConfig, kvstore kvstore.KVS return remote.NewAlertmanager(cfg, notifier.NewFileStore(cfg.OrgID, kvstore), decryptFn, autogenFn, m, tracer) } -func createRecordingWriter(featureToggles featuremgmt.FeatureToggles, settings setting.RecordingRuleSettings, httpClientProvider httpclient.Provider, tracer tracing.Tracer, m *metrics.RemoteWriter) (schedule.RecordingWriter, error) { +func createRecordingWriter(featureToggles featuremgmt.FeatureToggles, settings setting.RecordingRuleSettings, httpClientProvider httpclient.Provider, clock clock.Clock, tracer tracing.Tracer, m *metrics.RemoteWriter) (schedule.RecordingWriter, error) { logger := log.New("ngalert.writer") if featureToggles.IsEnabledGlobally(featuremgmt.FlagGrafanaManagedRecordingRules) { - return writer.NewPrometheusWriter(settings, httpClientProvider, tracer, logger, m) + return writer.NewPrometheusWriter(settings, httpClientProvider, clock, tracer, logger, m) } return writer.NoopWriter{}, nil diff --git a/pkg/services/ngalert/schedule/recording_rule_test.go b/pkg/services/ngalert/schedule/recording_rule_test.go index bf1a38ecd55..416489f5ccc 100644 --- a/pkg/services/ngalert/schedule/recording_rule_test.go +++ b/pkg/services/ngalert/schedule/recording_rule_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" @@ -158,7 +159,7 @@ func blankRecordingRuleForTests(ctx context.Context) *recordingRule { } func TestRecordingRule_Integration(t *testing.T) { - gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules()) + gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules(), models.RuleGen.WithOrgID(123)) ruleStore := newFakeRulesStore() reg := prometheus.NewPedanticRegistry() sch := setupScheduler(t, ruleStore, nil, reg, nil, nil) @@ -167,8 +168,9 @@ func TestRecordingRule_Integration(t *testing.T) { writerReg := prometheus.NewPedanticRegistry() sch.recordingWriter = setupWriter(t, writeTarget, writerReg) - t.Run("rule that errors", func(t *testing.T) { - rule := gen.With(withQueryForHealth("error")).GenerateRef() + t.Run("rule that succeeds", func(t *testing.T) { + writeTarget.Reset() + rule := gen.With(withQueryForHealth("ok")).GenerateRef() ruleStore.PutRule(context.Background(), rule) folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID) ruleFactory := ruleFactoryFromScheduler(sch) @@ -239,6 +241,145 @@ func TestRecordingRule_Integration(t *testing.T) { require.NoError(t, err) }) + t.Run("reports success evaluation metrics", func(t *testing.T) { + expectedMetric := fmt.Sprintf( + ` + # HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures. + # TYPE grafana_alerting_rule_evaluation_failures_total counter + grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0 + # HELP grafana_alerting_rule_evaluation_attempt_failures_total The total number of rule evaluation attempt failures. + # TYPE grafana_alerting_rule_evaluation_attempt_failures_total counter + grafana_alerting_rule_evaluation_attempt_failures_total{org="%[1]d"} 0 + `, + rule.OrgID, + ) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), + "grafana_alerting_rule_evaluation_failures_total", + "grafana_alerting_rule_evaluation_attempt_failures_total", + ) + require.NoError(t, err) + }) + + t.Run("reports remote write metrics", func(t *testing.T) { + expectedMetric := fmt.Sprintf( + ` + # HELP grafana_alerting_remote_writer_write_duration_seconds Histogram of remote write durations. + # TYPE grafana_alerting_remote_writer_write_duration_seconds histogram + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.005"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.01"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.025"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.05"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.1"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.25"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.5"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="1"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="2.5"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="5"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="10"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="+Inf"} 1 + grafana_alerting_remote_writer_write_duration_seconds_sum{backend="prometheus",org="%[1]d"} 0 + grafana_alerting_remote_writer_write_duration_seconds_count{backend="prometheus",org="%[1]d"} 1 + # HELP grafana_alerting_remote_writer_writes_total The total number of remote writes attempted. + # TYPE grafana_alerting_remote_writer_writes_total counter + grafana_alerting_remote_writer_writes_total{backend="prometheus", org="%[1]d", status_code="200"} 1 + `, + rule.OrgID, + ) + + err := testutil.GatherAndCompare(writerReg, bytes.NewBufferString(expectedMetric), + "grafana_alerting_remote_writer_writes_total", + "grafana_alerting_remote_writer_write_duration_seconds", + ) + require.NoError(t, err) + }) + + t.Run("status shows evaluation", func(t *testing.T) { + status := process.(*recordingRule).Status() + + require.Equal(t, "ok", status.Health) + require.Nil(t, status.LastError) + }) + + t.Run("write was performed", func(t *testing.T) { + require.NotZero(t, writeTarget.RequestsCount) + require.Contains(t, writeTarget.LastRequestBody, "some_metric") + }) + }) + + t.Run("rule that errors", func(t *testing.T) { + writeTarget.Reset() + rule := gen.With(withQueryForHealth("error")).GenerateRef() + ruleStore.PutRule(context.Background(), rule) + folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID) + ruleFactory := ruleFactoryFromScheduler(sch) + + process := ruleFactory.new(context.Background(), rule) + evalDoneChan := make(chan time.Time) + process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) { + evalDoneChan <- t + } + now := time.Now() + + go func() { + _ = process.Run() + }() + + t.Run("status shows no evaluations", func(t *testing.T) { + status := process.(*recordingRule).Status() + + require.Equal(t, "unknown", status.Health) + require.Nil(t, status.LastError) + require.Zero(t, status.EvaluationTimestamp) + require.Zero(t, status.EvaluationDuration) + }) + + process.Eval(&Evaluation{ + scheduledAt: now, + rule: rule, + folderTitle: folderTitle, + }) + _ = waitForTimeChannel(t, evalDoneChan) + + t.Run("reports basic evaluation metrics", func(t *testing.T) { + expectedMetric := fmt.Sprintf( + ` + # HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule. + # TYPE grafana_alerting_rule_evaluation_duration_seconds histogram + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 2 + grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 2 + grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0 + grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 2 + # HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations. + # TYPE grafana_alerting_rule_evaluations_total counter + grafana_alerting_rule_evaluations_total{org="%[1]d"} 2 + # HELP grafana_alerting_rule_evaluation_attempts_total The total number of rule evaluation attempts. + # TYPE grafana_alerting_rule_evaluation_attempts_total counter + grafana_alerting_rule_evaluation_attempts_total{org="%[1]d"} 2 + `, + rule.OrgID, + ) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), + "grafana_alerting_rule_evaluation_duration_seconds", + "grafana_alerting_rule_evaluations_total", + "grafana_alerting_rule_evaluation_attempts_total", + ) + require.NoError(t, err) + }) + t.Run("reports failure evaluation metrics", func(t *testing.T) { expectedMetric := fmt.Sprintf( ` @@ -259,6 +400,39 @@ func TestRecordingRule_Integration(t *testing.T) { require.NoError(t, err) }) + t.Run("reports remote write metrics", func(t *testing.T) { + expectedMetric := fmt.Sprintf( + ` + # HELP grafana_alerting_remote_writer_write_duration_seconds Histogram of remote write durations. + # TYPE grafana_alerting_remote_writer_write_duration_seconds histogram + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.005"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.01"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.025"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.05"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.1"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.25"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.5"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="1"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="2.5"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="5"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="10"} 1 + grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="+Inf"} 1 + grafana_alerting_remote_writer_write_duration_seconds_sum{backend="prometheus",org="%[1]d"} 0 + grafana_alerting_remote_writer_write_duration_seconds_count{backend="prometheus",org="%[1]d"} 1 + # HELP grafana_alerting_remote_writer_writes_total The total number of remote writes attempted. + # TYPE grafana_alerting_remote_writer_writes_total counter + grafana_alerting_remote_writer_writes_total{backend="prometheus", org="%[1]d", status_code="200"} 1 + `, + rule.OrgID, + ) + + err := testutil.GatherAndCompare(writerReg, bytes.NewBufferString(expectedMetric), + "grafana_alerting_remote_writer_writes_total", + "grafana_alerting_remote_writer_write_duration_seconds", + ) + require.NoError(t, err) + }) + t.Run("status shows evaluation", func(t *testing.T) { status := process.(*recordingRule).Status() @@ -358,7 +532,7 @@ func withQueryForHealth(health string) models.AlertRuleMutator { func setupWriter(t *testing.T, target *writer.TestRemoteWriteTarget, reg prometheus.Registerer) *writer.PrometheusWriter { provider := testClientProvider{} m := metrics.NewNGAlert(reg) - wr, err := writer.NewPrometheusWriter(target.ClientSettings(), provider, tracing.InitializeTracerForTest(), log.NewNopLogger(), m.GetRemoteWriterMetrics()) + wr, err := writer.NewPrometheusWriter(target.ClientSettings(), provider, clock.NewMock(), tracing.InitializeTracerForTest(), log.NewNopLogger(), m.GetRemoteWriterMetrics()) require.NoError(t, err) return wr } diff --git a/pkg/services/ngalert/writer/prom.go b/pkg/services/ngalert/writer/prom.go index a260729a357..2a0637c9d73 100644 --- a/pkg/services/ngalert/writer/prom.go +++ b/pkg/services/ngalert/writer/prom.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/benbjohnson/clock" "github.com/grafana/dataplane/sdata/numeric" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" @@ -98,6 +99,7 @@ type HttpClientProvider interface { type PrometheusWriter struct { client promremote.Client + clock clock.Clock logger log.Logger metrics *metrics.RemoteWriter } @@ -105,6 +107,7 @@ type PrometheusWriter struct { func NewPrometheusWriter( settings setting.RecordingRuleSettings, httpClientProvider HttpClientProvider, + clock clock.Clock, tracer tracing.Tracer, l log.Logger, metrics *metrics.RemoteWriter, @@ -145,6 +148,7 @@ func NewPrometheusWriter( return &PrometheusWriter{ client: client, + clock: clock, logger: l, metrics: metrics, }, nil @@ -205,9 +209,9 @@ func (w PrometheusWriter) Write(ctx context.Context, name string, t time.Time, f } l.Debug("Writing metric", "name", name) - writeStart := time.Now() + writeStart := w.clock.Now() res, writeErr := w.client.WriteTimeSeries(ctx, series, promremote.WriteOptions{}) - w.metrics.WriteDuration.WithLabelValues(lvs...).Observe(time.Since(writeStart).Seconds()) + w.metrics.WriteDuration.WithLabelValues(lvs...).Observe(w.clock.Now().Sub(writeStart).Seconds()) lvs = append(lvs, fmt.Sprint(res.StatusCode)) w.metrics.WritesTotal.WithLabelValues(lvs...).Inc() diff --git a/pkg/services/ngalert/writer/prom_test.go b/pkg/services/ngalert/writer/prom_test.go index 43b41a7aa27..020512b2e97 100644 --- a/pkg/services/ngalert/writer/prom_test.go +++ b/pkg/services/ngalert/writer/prom_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/setting" @@ -140,6 +141,7 @@ func TestPrometheusWriter_Write(t *testing.T) { client := &testClient{} writer := &PrometheusWriter{ client: client, + clock: clock.New(), logger: log.New("test"), metrics: metrics.NewRemoteWriterMetrics(prometheus.NewRegistry()), } diff --git a/pkg/services/ngalert/writer/testing.go b/pkg/services/ngalert/writer/testing.go index 3919f12410b..79bed8c1dbe 100644 --- a/pkg/services/ngalert/writer/testing.go +++ b/pkg/services/ngalert/writer/testing.go @@ -1,8 +1,10 @@ package writer import ( + "io" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -15,14 +17,17 @@ const RemoteWriteEndpoint = "/api/v1/write" type TestRemoteWriteTarget struct { srv *httptest.Server - RequestsCount int + mtx sync.Mutex + RequestsCount int + LastRequestBody string } func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget { t.Helper() target := &TestRemoteWriteTarget{ - RequestsCount: 0, + RequestsCount: 0, + LastRequestBody: "", } handler := func(w http.ResponseWriter, r *http.Request) { @@ -30,9 +35,18 @@ func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget { require.Fail(t, "Received unexpected request for endpoint %s", r.URL.Path) } + target.mtx.Lock() + defer target.mtx.Unlock() target.RequestsCount += 1 + bd, err := io.ReadAll(r.Body) + defer func() { + _ = r.Body.Close() + }() + require.NoError(t, err) + target.LastRequestBody = string(bd) + w.WriteHeader(http.StatusOK) - _, err := w.Write([]byte(`{}`)) + _, err = w.Write([]byte(`{}`)) require.NoError(t, err) } server := httptest.NewServer(http.HandlerFunc(handler)) @@ -53,3 +67,11 @@ func (s *TestRemoteWriteTarget) ClientSettings() setting.RecordingRuleSettings { BasicAuthPassword: "", } } + +// Reset resets all tracked requests and counters. +func (s *TestRemoteWriteTarget) Reset() { + s.mtx.Lock() + defer s.mtx.Unlock() + s.RequestsCount = 0 + s.LastRequestBody = "" +}