Alerting: Create integration testing infra for recording rules (#90306)

* Create some integration testing infra for RRs

* whoops

* Require no error in responding

* fix linter

* Panic, no need to pass testing around

* Extend status test
This commit is contained in:
Alexander Weaver 2024-07-11 14:59:52 -05:00 committed by GitHub
parent ab32183e18
commit 111ebd4fb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 209 additions and 68 deletions

View File

@ -3,8 +3,10 @@ package schedule
import (
"bytes"
context "context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync"
"testing"
"time"
@ -13,8 +15,12 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
models "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/writer"
"github.com/grafana/grafana/pkg/util"
@ -156,82 +162,162 @@ func TestRecordingRule_Integration(t *testing.T) {
ruleStore := newFakeRulesStore()
reg := prometheus.NewPedanticRegistry()
sch := setupScheduler(t, ruleStore, nil, reg, nil, nil)
rule := gen.GenerateRef()
ruleStore.PutRule(context.Background(), rule)
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
ruleFactory := ruleFactoryFromScheduler(sch)
writeTarget := writer.NewTestRemoteWriteTarget(t)
defer writeTarget.Close()
writerReg := prometheus.NewPedanticRegistry()
sch.recordingWriter = setupWriter(t, writeTarget, writerReg)
process := ruleFactory.new(context.Background(), rule)
evalDoneChan := make(chan time.Time)
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
evalDoneChan <- t
}
now := time.Now()
t.Run("rule that errors", func(t *testing.T) {
rule := gen.With(withQueryForHealth("error")).GenerateRef()
ruleStore.PutRule(context.Background(), rule)
folderTitle := ruleStore.getNamespaceTitle(rule.NamespaceUID)
ruleFactory := ruleFactoryFromScheduler(sch)
go func() {
_ = process.Run()
}()
process := ruleFactory.new(context.Background(), rule)
evalDoneChan := make(chan time.Time)
process.(*recordingRule).evalAppliedHook = func(_ models.AlertRuleKey, t time.Time) {
evalDoneChan <- t
}
now := time.Now()
t.Run("status shows no evaluations", func(t *testing.T) {
status := process.(*recordingRule).Status()
go func() {
_ = process.Run()
}()
require.Equal(t, "unknown", status.Health)
require.Nil(t, status.LastError)
require.Zero(t, status.EvaluationTimestamp)
require.Zero(t, status.EvaluationDuration)
})
t.Run("status shows no evaluations", func(t *testing.T) {
status := process.(*recordingRule).Status()
process.Eval(&Evaluation{
scheduledAt: now,
rule: rule,
folderTitle: folderTitle,
})
_ = waitForTimeChannel(t, evalDoneChan)
require.Equal(t, "unknown", status.Health)
require.Nil(t, status.LastError)
require.Zero(t, status.EvaluationTimestamp)
require.Zero(t, status.EvaluationDuration)
})
t.Run("status shows evaluation", func(t *testing.T) {
status := process.(*recordingRule).Status()
process.Eval(&Evaluation{
scheduledAt: now,
rule: rule,
folderTitle: folderTitle,
})
_ = waitForTimeChannel(t, evalDoneChan)
// TODO: Due to the randomness in the test, the rule randomly succeeds or fails.
// TODO: Solve this in a future PR, and assert something more strict here.
require.NotEqual(t, "unknown", status.Health)
})
t.Run("reports basic evaluation metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluation_attempts_total The total number of rule evaluation attempts.
# TYPE grafana_alerting_rule_evaluation_attempts_total counter
grafana_alerting_rule_evaluation_attempts_total{org="%[1]d"} 1
`,
rule.OrgID,
)
t.Run("reports basic evaluation metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
# TYPE grafana_alerting_rule_evaluation_duration_seconds histogram
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.01"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="0.5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="1"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="5"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="10"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="15"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="30"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="60"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="120"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="180"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="240"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="300"} 1
grafana_alerting_rule_evaluation_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
grafana_alerting_rule_evaluation_duration_seconds_sum{org="%[1]d"} 0
grafana_alerting_rule_evaluation_duration_seconds_count{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluations_total The total number of rule evaluations.
# TYPE grafana_alerting_rule_evaluations_total counter
grafana_alerting_rule_evaluations_total{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluation_attempts_total The total number of rule evaluation attempts.
# TYPE grafana_alerting_rule_evaluation_attempts_total counter
grafana_alerting_rule_evaluation_attempts_total{org="%[1]d"} 1
`,
rule.OrgID,
)
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_rule_evaluation_duration_seconds",
"grafana_alerting_rule_evaluations_total",
"grafana_alerting_rule_evaluation_attempts_total",
)
require.NoError(t, err)
})
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_rule_evaluation_duration_seconds",
"grafana_alerting_rule_evaluations_total",
"grafana_alerting_rule_evaluation_attempts_total",
)
require.NoError(t, err)
t.Run("reports failure evaluation metrics", func(t *testing.T) {
expectedMetric := fmt.Sprintf(
`
# HELP grafana_alerting_rule_evaluation_failures_total The total number of rule evaluation failures.
# TYPE grafana_alerting_rule_evaluation_failures_total counter
grafana_alerting_rule_evaluation_failures_total{org="%[1]d"} 1
# HELP grafana_alerting_rule_evaluation_attempt_failures_total The total number of rule evaluation attempt failures.
# TYPE grafana_alerting_rule_evaluation_attempt_failures_total counter
grafana_alerting_rule_evaluation_attempt_failures_total{org="%[1]d"} 1
`,
rule.OrgID,
)
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric),
"grafana_alerting_rule_evaluation_failures_total",
"grafana_alerting_rule_evaluation_attempt_failures_total",
)
require.NoError(t, err)
})
t.Run("status shows evaluation", func(t *testing.T) {
status := process.(*recordingRule).Status()
require.Equal(t, "error", status.Health)
require.NotNil(t, status.LastError)
require.ErrorContains(t, status.LastError, "unable to find dependent node")
})
t.Run("no write was performed", func(t *testing.T) {
require.Zero(t, writeTarget.RequestsCount)
})
})
}
func withQueryForHealth(health string) models.AlertRuleMutator {
var expression string
switch health {
case "ok":
expression = `{
"datasourceUid": "__expr__",
"type":"math",
"expression":"2 + 1"
}`
case "error":
expression = `{
"datasourceUid": "__expr__",
"type":"math",
"expression":"$NOTEXIST"
}`
default:
panic(fmt.Sprintf("Query generation for health %s is not supported yet", health))
}
return func(rule *models.AlertRule) {
rule.Record.From = "A"
rule.Data = []models.AlertQuery{
{
DatasourceUID: expr.DatasourceUID,
Model: json.RawMessage(expression),
RelativeTimeRange: models.RelativeTimeRange{
From: models.Duration(5 * time.Hour),
To: models.Duration(3 * time.Hour),
},
RefID: "A",
},
}
}
}
func setupWriter(t *testing.T, target *writer.TestRemoteWriteTarget, reg prometheus.Registerer) *writer.PrometheusWriter {
provider := testClientProvider{}
m := metrics.NewNGAlert(reg)
wr, err := writer.NewPrometheusWriter(target.ClientSettings(), provider, tracing.InitializeTracerForTest(), log.NewNopLogger(), m.GetRemoteWriterMetrics())
require.NoError(t, err)
return wr
}
type testClientProvider struct{}
func (t testClientProvider) New(options ...httpclient.Options) (*http.Client, error) {
return &http.Client{}, nil
}

View File

@ -0,0 +1,55 @@
package writer
import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/require"
)
const RemoteWriteEndpoint = "/api/v1/write"
type TestRemoteWriteTarget struct {
srv *httptest.Server
RequestsCount int
}
func NewTestRemoteWriteTarget(t *testing.T) *TestRemoteWriteTarget {
t.Helper()
target := &TestRemoteWriteTarget{
RequestsCount: 0,
}
handler := func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != RemoteWriteEndpoint {
require.Fail(t, "Received unexpected request for endpoint %s", r.URL.Path)
}
target.RequestsCount += 1
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{}`))
require.NoError(t, err)
}
server := httptest.NewServer(http.HandlerFunc(handler))
target.srv = server
return target
}
func (s *TestRemoteWriteTarget) Close() {
s.srv.Close()
}
func (s *TestRemoteWriteTarget) ClientSettings() setting.RecordingRuleSettings {
return setting.RecordingRuleSettings{
URL: s.srv.URL + RemoteWriteEndpoint,
Timeout: 1 * time.Second,
BasicAuthUsername: "",
BasicAuthPassword: "",
}
}