Alerting: Integration testing for recording rules including writes (#90390)

* Add success case and tests for writer using metrics

* Use testable version of clock

* Assert a specific series was written

* Fix linter

* Fix manually constructed writer
This commit is contained in:
Alexander Weaver 2024-07-18 17:14:49 -05:00 committed by GitHub
parent f446096eb1
commit 418b077c59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 214 additions and 12 deletions

View File

@ -344,7 +344,7 @@ func (ng *AlertNG) init() error {
evalFactory := eval.NewEvaluatorFactory(ng.Cfg.UnifiedAlerting, ng.DataSourceCache, ng.ExpressionService, ng.pluginsStore) evalFactory := eval.NewEvaluatorFactory(ng.Cfg.UnifiedAlerting, ng.DataSourceCache, ng.ExpressionService, ng.pluginsStore)
recordingWriter, err := createRecordingWriter(ng.FeatureToggles, ng.Cfg.UnifiedAlerting.RecordingRules, ng.httpClientProvider, ng.tracer, ng.Metrics.GetRemoteWriterMetrics()) recordingWriter, err := createRecordingWriter(ng.FeatureToggles, ng.Cfg.UnifiedAlerting.RecordingRules, ng.httpClientProvider, clk, ng.tracer, ng.Metrics.GetRemoteWriterMetrics())
if err != nil { if err != nil {
return fmt.Errorf("failed to initialize recording writer: %w", err) return fmt.Errorf("failed to initialize recording writer: %w", err)
} }
@ -644,11 +644,11 @@ func createRemoteAlertmanager(cfg remote.AlertmanagerConfig, kvstore kvstore.KVS
return remote.NewAlertmanager(cfg, notifier.NewFileStore(cfg.OrgID, kvstore), decryptFn, autogenFn, m, tracer) return remote.NewAlertmanager(cfg, notifier.NewFileStore(cfg.OrgID, kvstore), decryptFn, autogenFn, m, tracer)
} }
func createRecordingWriter(featureToggles featuremgmt.FeatureToggles, settings setting.RecordingRuleSettings, httpClientProvider httpclient.Provider, tracer tracing.Tracer, m *metrics.RemoteWriter) (schedule.RecordingWriter, error) { func createRecordingWriter(featureToggles featuremgmt.FeatureToggles, settings setting.RecordingRuleSettings, httpClientProvider httpclient.Provider, clock clock.Clock, tracer tracing.Tracer, m *metrics.RemoteWriter) (schedule.RecordingWriter, error) {
logger := log.New("ngalert.writer") logger := log.New("ngalert.writer")
if featureToggles.IsEnabledGlobally(featuremgmt.FlagGrafanaManagedRecordingRules) { if featureToggles.IsEnabledGlobally(featuremgmt.FlagGrafanaManagedRecordingRules) {
return writer.NewPrometheusWriter(settings, httpClientProvider, tracer, logger, m) return writer.NewPrometheusWriter(settings, httpClientProvider, clock, tracer, logger, m)
} }
return writer.NoopWriter{}, nil return writer.NoopWriter{}, nil

View File

@ -11,6 +11,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -158,7 +159,7 @@ func blankRecordingRuleForTests(ctx context.Context) *recordingRule {
} }
func TestRecordingRule_Integration(t *testing.T) { func TestRecordingRule_Integration(t *testing.T) {
gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules()) gen := models.RuleGen.With(models.RuleGen.WithAllRecordingRules(), models.RuleGen.WithOrgID(123))
ruleStore := newFakeRulesStore() ruleStore := newFakeRulesStore()
reg := prometheus.NewPedanticRegistry() reg := prometheus.NewPedanticRegistry()
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil) sch := setupScheduler(t, ruleStore, nil, reg, nil, nil)
@ -167,8 +168,9 @@ func TestRecordingRule_Integration(t *testing.T) {
writerReg := prometheus.NewPedanticRegistry() writerReg := prometheus.NewPedanticRegistry()
sch.recordingWriter = setupWriter(t, writeTarget, writerReg) sch.recordingWriter = setupWriter(t, writeTarget, writerReg)
t.Run("rule that errors", func(t *testing.T) { t.Run("rule that succeeds", func(t *testing.T) {
rule := gen.With(withQueryForHealth("error")).GenerateRef() writeTarget.Reset()
rule := gen.With(withQueryForHealth("ok")).GenerateRef()
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID) folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
ruleFactory := ruleFactoryFromScheduler(sch) ruleFactory := ruleFactoryFromScheduler(sch)
@ -239,6 +241,145 @@ func TestRecordingRule_Integration(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("reports success evaluation metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 0
# HELP grafana_alerting_rule_evaluation_attempt_failures_total The total number of rule evaluation attempt failures.
# TYPE grafana_alerting_rule_evaluation_attempt_failures_total counter
grafana_alerting_rule_evaluation_attempt_failures_total{org="%[1]d"} 0
`,
rule.OrgID,
)
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_rule_evaluation_failures_total",
"grafana_alerting_rule_evaluation_attempt_failures_total",
)
require.NoError(t, err)
})
t.Run("reports remote write metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_remote_writer_write_duration_seconds Histogram of remote write durations.
# TYPE grafana_alerting_remote_writer_write_duration_seconds histogram
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.005"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.01"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.025"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.05"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.1"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.25"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.5"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="1"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="2.5"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="5"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="10"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="+Inf"} 1
grafana_alerting_remote_writer_write_duration_seconds_sum{backend="prometheus",org="%[1]d"} 0
grafana_alerting_remote_writer_write_duration_seconds_count{backend="prometheus",org="%[1]d"} 1
# HELP grafana_alerting_remote_writer_writes_total The total number of remote writes attempted.
# TYPE grafana_alerting_remote_writer_writes_total counter
grafana_alerting_remote_writer_writes_total{backend="prometheus", org="%[1]d", status_code="200"} 1
`,
rule.OrgID,
)
err := testutil.GatherAndCompare(writerReg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_remote_writer_writes_total",
"grafana_alerting_remote_writer_write_duration_seconds",
)
require.NoError(t, err)
})
t.Run("status shows evaluation", func(t *testing.T) {
status := process.(*recordingRule).Status()
require.Equal(t, "ok", status.Health)
require.Nil(t, status.LastError)
})
t.Run("write was performed", func(t *testing.T) {
require.NotZero(t, writeTarget.RequestsCount)
require.Contains(t, writeTarget.LastRequestBody, "some_metric")
})
})
t.Run("rule that errors", func(t *testing.T) {
writeTarget.Reset()
rule := gen.With(withQueryForHealth("error")).GenerateRef()
ruleStore.PutRule(context.Background(), rule)
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
ruleFactory := ruleFactoryFromScheduler(sch)
process := ruleFactory.new(context.Background(), rule)
evalDoneChan := make(chan time.Time)
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
evalDoneChan <- t
}
now := time.Now()
go func() {
_ = process.Run()
}()
t.Run("status shows no evaluations", func(t *testing.T) {
status := process.(*recordingRule).Status()
require.Equal(t, "unknown", status.Health)
require.Nil(t, status.LastError)
require.Zero(t, status.EvaluationTimestamp)
require.Zero(t, status.EvaluationDuration)
})
process.Eval(&Evaluation{
scheduledAt: now,
rule: rule,
folderTitle: folderTitle,
})
_ = waitForTimeChannel(t, evalDoneChan)
t.Run("reports basic evaluation metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 2
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 2
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 2
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total{org="%[1]d"} 2
# HELP grafana_alerting_rule_evaluation_attempts_total The total number of rule evaluation attempts.
# TYPE grafana_alerting_rule_evaluation_attempts_total counter
grafana_alerting_rule_evaluation_attempts_total{org="%[1]d"} 2
`,
rule.OrgID,
)
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_rule_evaluation_duration_seconds",
"grafana_alerting_rule_evaluations_total",
"grafana_alerting_rule_evaluation_attempts_total",
)
require.NoError(t, err)
})
t.Run("reports failure evaluation metrics", func(t *testing.T) { t.Run("reports failure evaluation metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf( expectedMetric := fmt.Sprintf(
` `
@ -259,6 +400,39 @@ func TestRecordingRule_Integration(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("reports remote write metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_remote_writer_write_duration_seconds Histogram of remote write durations.
# TYPE grafana_alerting_remote_writer_write_duration_seconds histogram
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.005"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.01"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.025"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.05"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.1"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.25"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="0.5"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="1"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="2.5"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="5"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="10"} 1
grafana_alerting_remote_writer_write_duration_seconds_bucket{backend="prometheus",org="%[1]d",le="+Inf"} 1
grafana_alerting_remote_writer_write_duration_seconds_sum{backend="prometheus",org="%[1]d"} 0
grafana_alerting_remote_writer_write_duration_seconds_count{backend="prometheus",org="%[1]d"} 1
# HELP grafana_alerting_remote_writer_writes_total The total number of remote writes attempted.
# TYPE grafana_alerting_remote_writer_writes_total counter
grafana_alerting_remote_writer_writes_total{backend="prometheus", org="%[1]d", status_code="200"} 1
`,
rule.OrgID,
)
err := testutil.GatherAndCompare(writerReg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_remote_writer_writes_total",
"grafana_alerting_remote_writer_write_duration_seconds",
)
require.NoError(t, err)
})
t.Run("status shows evaluation", func(t *testing.T) { t.Run("status shows evaluation", func(t *testing.T) {
status := process.(*recordingRule).Status() status := process.(*recordingRule).Status()
@ -358,7 +532,7 @@ func withQueryForHealth(health string) models.AlertRuleMutator {
func setupWriter(t *testing.T, target *writer.TestRemoteWriteTarget, reg prometheus.Registerer) *writer.PrometheusWriter { func setupWriter(t *testing.T, target *writer.TestRemoteWriteTarget, reg prometheus.Registerer) *writer.PrometheusWriter {
provider := testClientProvider{} provider := testClientProvider{}
m := metrics.NewNGAlert(reg) m := metrics.NewNGAlert(reg)
wr, err := writer.NewPrometheusWriter(target.ClientSettings(), provider, tracing.InitializeTracerForTest(), log.NewNopLogger(), m.GetRemoteWriterMetrics()) wr, err := writer.NewPrometheusWriter(target.ClientSettings(), provider, clock.NewMock(), tracing.InitializeTracerForTest(), log.NewNopLogger(), m.GetRemoteWriterMetrics())
require.NoError(t, err) require.NoError(t, err)
return wr return wr
} }

View File

@ -9,6 +9,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/benbjohnson/clock"
"github.com/grafana/dataplane/sdata/numeric" "github.com/grafana/dataplane/sdata/numeric"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/tracing"
@ -98,6 +99,7 @@ type HttpClientProvider interface {
type PrometheusWriter struct { type PrometheusWriter struct {
client promremote.Client client promremote.Client
clock clock.Clock
logger log.Logger logger log.Logger
metrics *metrics.RemoteWriter metrics *metrics.RemoteWriter
} }
@ -105,6 +107,7 @@ type PrometheusWriter struct {
func NewPrometheusWriter( func NewPrometheusWriter(
settings setting.RecordingRuleSettings, settings setting.RecordingRuleSettings,
httpClientProvider HttpClientProvider, httpClientProvider HttpClientProvider,
clock clock.Clock,
tracer tracing.Tracer, tracer tracing.Tracer,
l log.Logger, l log.Logger,
metrics *metrics.RemoteWriter, metrics *metrics.RemoteWriter,
@ -145,6 +148,7 @@ func NewPrometheusWriter(
return &PrometheusWriter{ return &PrometheusWriter{
client: client, client: client,
clock: clock,
logger: l, logger: l,
metrics: metrics, metrics: metrics,
}, nil }, nil
@ -205,9 +209,9 @@ func (w PrometheusWriter) Write(ctx context.Context, name string, t time.Time, f
} }
l.Debug("Writing metric", "name", name) l.Debug("Writing metric", "name", name)
writeStart := time.Now() writeStart := w.clock.Now()
res, writeErr := w.client.WriteTimeSeries(ctx, series, promremote.WriteOptions{}) res, writeErr := w.client.WriteTimeSeries(ctx, series, promremote.WriteOptions{})
w.metrics.WriteDuration.WithLabelValues(lvs...).Observe(time.Since(writeStart).Seconds()) w.metrics.WriteDuration.WithLabelValues(lvs...).Observe(w.clock.Now().Sub(writeStart).Seconds())
lvs = append(lvs, fmt.Sprint(res.StatusCode)) lvs = append(lvs, fmt.Sprint(res.StatusCode))
w.metrics.WritesTotal.WithLabelValues(lvs...).Inc() w.metrics.WritesTotal.WithLabelValues(lvs...).Inc()

View File

@ -10,6 +10,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
@ -140,6 +141,7 @@ func TestPrometheusWriter_Write(t *testing.T) {
client := &testClient{} client := &testClient{}
writer := &PrometheusWriter{ writer := &PrometheusWriter{
client: client, client: client,
clock: clock.New(),
logger: log.New("test"), logger: log.New("test"),
metrics: metrics.NewRemoteWriterMetrics(prometheus.NewRegistry()), metrics: metrics.NewRemoteWriterMetrics(prometheus.NewRegistry()),
} }

View File

@ -1,8 +1,10 @@
package writer package writer
import ( import (
"io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"sync"
"testing" "testing"
"time" "time"
@ -15,14 +17,17 @@ const RemoteWriteEndpoint = "/api/v1/write"
type TestRemoteWriteTarget struct { type TestRemoteWriteTarget struct {
srv *httptest.Server srv *httptest.Server
RequestsCount int mtx sync.Mutex
RequestsCount int
LastRequestBody string
} }
func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget { func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget {
t.Helper() t.Helper()
target := &TestRemoteWriteTarget{ target := &TestRemoteWriteTarget{
RequestsCount: 0, RequestsCount: 0,
LastRequestBody: "",
} }
handler := func(w http.ResponseWriter, r *http.Request) { handler := func(w http.ResponseWriter, r *http.Request) {
@ -30,9 +35,18 @@ func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget {
require.Fail(t, "Received unexpected request for endpoint %s", r.URL.Path) require.Fail(t, "Received unexpected request for endpoint %s", r.URL.Path)
} }
target.mtx.Lock()
defer target.mtx.Unlock()
target.RequestsCount += 1 target.RequestsCount += 1
bd, err := io.ReadAll(r.Body)
defer func() {
_ = r.Body.Close()
}()
require.NoError(t, err)
target.LastRequestBody = string(bd)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{}`)) _, err = w.Write([]byte(`{}`))
require.NoError(t, err) require.NoError(t, err)
} }
server := httptest.NewServer(http.HandlerFunc(handler)) server := httptest.NewServer(http.HandlerFunc(handler))
@ -53,3 +67,11 @@ func (s *TestRemoteWriteTarget) ClientSettings() setting.RecordingRuleSettings {
BasicAuthPassword: "", BasicAuthPassword: "",
} }
} }
// Reset resets all tracked requests and counters.
func (s *TestRemoteWriteTarget) Reset() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.RequestsCount = 0
s.LastRequestBody = ""
}