Alerting: Add new metrics and tracings to state manager and scheduler (#71398)

* add metrics and tracing to state manager

* propagate tracer to state manager

* add scheduler metrics

* fix backtesting

* add test for state metrics

* remove StateUpdateCount

* update docs

* metrics can be null

* add tracer to new tests
This commit is contained in:
Yuri Tseretyan
2023-08-16 03:04:18 -04:00
committed by GitHub
parent 90e3f516ff
commit 938e26b59f
14 changed files with 264 additions and 64 deletions

View File

@@ -8,8 +8,10 @@ import (
"github.com/benbjohnson/clock"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
@@ -30,6 +32,7 @@ type AlertInstanceManager interface {
type Manager struct {
log log.Logger
metrics *metrics.State
tracer tracing.Tracer
clock clock.Clock
cache *cache
@@ -60,6 +63,8 @@ type ManagerCfg struct {
// ApplyNoDataAndErrorToAllStates makes state manager to apply exceptional results (NoData and Error)
// to all states when corresponding execution in the rule definition is set to either `Alerting` or `OK`
ApplyNoDataAndErrorToAllStates bool
Tracer tracing.Tracer
}
func NewManager(cfg ManagerCfg) *Manager {
@@ -76,6 +81,7 @@ func NewManager(cfg ManagerCfg) *Manager {
doNotSaveNormalState: cfg.DoNotSaveNormalState,
maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency,
applyNoDataAndErrorToAllStates: cfg.ApplyNoDataAndErrorToAllStates,
tracer: cfg.Tracer,
}
}
@@ -251,18 +257,46 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.Alert
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.
// if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []StateTransition {
logger := st.log.FromContext(ctx)
tracingCtx, span := st.tracer.Start(ctx, "alert rule state calculation")
defer span.End()
span.SetAttributes("rule_uid", alertRule.UID, attribute.String("rule_uid", alertRule.UID))
span.SetAttributes("org_id", alertRule.OrgID, attribute.Int64("org_id", alertRule.OrgID))
span.SetAttributes("rule_version", alertRule.Version, attribute.Int64("rule_version", alertRule.Version))
utcTick := evaluatedAt.UTC().Format(time.RFC3339Nano)
span.SetAttributes("tick", utcTick, attribute.String("tick", utcTick))
span.SetAttributes("results", len(results), attribute.Int("tick", len(results)))
logger := st.log.FromContext(tracingCtx)
logger.Debug("State manager processing evaluation results", "resultCount", len(results))
states := st.setNextStateForRule(ctx, alertRule, results, extraLabels, logger)
states := st.setNextStateForRule(tracingCtx, alertRule, results, extraLabels, logger)
span.AddEvents([]string{"message", "state_transitions"},
[]tracing.EventValue{
{Str: "results processed"},
{Num: int64(len(states))},
})
staleStates := st.deleteStaleStatesFromCache(ctx, logger, evaluatedAt, alertRule)
st.deleteAlertStates(ctx, logger, staleStates)
st.deleteAlertStates(tracingCtx, logger, staleStates)
st.saveAlertStates(ctx, logger, states...)
if len(staleStates) > 0 {
span.AddEvents([]string{"message", "state_transitions"},
[]tracing.EventValue{
{Str: "deleted stale states"},
{Num: int64(len(staleStates))},
})
}
st.saveAlertStates(tracingCtx, logger, states...)
span.AddEvents([]string{"message"},
[]tracing.EventValue{
{Str: "updated database"},
})
allChanges := append(states, staleStates...)
if st.historian != nil {
st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges)
st.historian.Record(tracingCtx, history_model.NewRuleMeta(alertRule, logger), allChanges)
}
return allChanges
}
@@ -303,6 +337,7 @@ func (st *Manager) setNextStateForAll(ctx context.Context, alertRule *ngModels.A
// Set the current state based on evaluation results
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, currentState *State, result eval.Result, logger log.Logger) StateTransition {
start := st.clock.Now()
currentState.LastEvaluationTime = result.EvaluatedAt
currentState.EvaluationDuration = result.EvaluationDuration
currentState.Results = append(currentState.Results, Evaluation{
@@ -390,6 +425,10 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu
PreviousStateReason: oldReason,
}
if st.metrics != nil {
st.metrics.StateUpdateDuration.Observe(st.clock.Now().Sub(start).Seconds())
}
return nextState
}

View File

@@ -6,14 +6,16 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/state/historian"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/mock"
)
func BenchmarkProcessEvalResults(b *testing.B) {
@@ -25,6 +27,7 @@ func BenchmarkProcessEvalResults(b *testing.B) {
cfg := state.ManagerCfg{
Historian: hist,
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
sut := state.NewManager(cfg)
now := time.Now().UTC()

View File

@@ -19,6 +19,7 @@ import (
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log/logtest"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
@@ -311,6 +312,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
cfg := ManagerCfg{
Metrics: testMetrics,
Tracer: tracing.InitializeTracerForTest(),
ExternalURL: nil,
InstanceStore: &FakeInstanceStore{},
Images: &NotAvailableImageService{},

View File

@@ -1,6 +1,7 @@
package state_test
import (
"bytes"
"context"
"errors"
"fmt"
@@ -15,11 +16,13 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/annotations/annotationstest"
"github.com/grafana/grafana/pkg/services/dashboards"
@@ -198,6 +201,7 @@ func TestWarmStateCache(t *testing.T) {
Clock: clock.NewMock(),
Historian: &state.FakeHistorian{},
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
st.Warm(ctx, dbstore)
@@ -234,6 +238,7 @@ func TestDashboardAnnotations(t *testing.T) {
Clock: clock.New(),
Historian: hist,
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
@@ -1198,18 +1203,21 @@ func TestProcessEvalResults(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
m := metrics.NewHistorianMetrics(prometheus.NewRegistry())
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, m)
hist := historian.NewAnnotationBackend(store, nil, m)
reg := prometheus.NewPedanticRegistry()
stateMetrics := metrics.NewStateMetrics(reg)
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, metrics)
hist := historian.NewAnnotationBackend(store, nil, metrics)
clk := clock.NewMock()
cfg := state.ManagerCfg{
Metrics: testMetrics.GetStateMetrics(),
Metrics: stateMetrics,
ExternalURL: nil,
InstanceStore: &state.FakeInstanceStore{},
Images: &state.NotAvailableImageService{},
Clock: clk,
Historian: hist,
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
@@ -1220,7 +1228,7 @@ func TestProcessEvalResults(t *testing.T) {
slices.SortFunc(evals, func(a, b time.Time) bool {
return a.Before(b)
})
results := 0
for _, evalTime := range evals {
res := tc.evalResults[evalTime]
for i := 0; i < len(res); i++ {
@@ -1228,6 +1236,7 @@ func TestProcessEvalResults(t *testing.T) {
}
clk.Set(evalTime)
_ = st.ProcessEvalResults(context.Background(), evalTime, tc.alertRule, res, systemLabels)
results += len(res)
}
states := st.GetStatesForRuleUID(tc.alertRule.OrgID, tc.alertRule.UID)
@@ -1278,6 +1287,22 @@ func TestProcessEvalResults(t *testing.T) {
require.Eventuallyf(t, func() bool {
return tc.expectedAnnotations == fakeAnnoRepo.Len()
}, time.Second, 100*time.Millisecond, "%d annotations are present, expected %d. We have %+v", fakeAnnoRepo.Len(), tc.expectedAnnotations, printAllAnnotations(fakeAnnoRepo.Items()))
expectedMetric := fmt.Sprintf(
`# HELP grafana_alerting_state_calculation_duration_seconds The duration of calculation of a single state.
# TYPE grafana_alerting_state_calculation_duration_seconds histogram
grafana_alerting_state_calculation_duration_seconds_bucket{le="0.01"} %[1]d
grafana_alerting_state_calculation_duration_seconds_bucket{le="0.1"} %[1]d
grafana_alerting_state_calculation_duration_seconds_bucket{le="1"} %[1]d
grafana_alerting_state_calculation_duration_seconds_bucket{le="2"} %[1]d
grafana_alerting_state_calculation_duration_seconds_bucket{le="5"} %[1]d
grafana_alerting_state_calculation_duration_seconds_bucket{le="10"} %[1]d
grafana_alerting_state_calculation_duration_seconds_bucket{le="+Inf"} %[1]d
grafana_alerting_state_calculation_duration_seconds_sum 0
grafana_alerting_state_calculation_duration_seconds_count %[1]d
`, results)
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_state_calculation_duration_seconds", "grafana_alerting_state_calculation_total")
require.NoError(t, err)
})
}
@@ -1292,6 +1317,7 @@ func TestProcessEvalResults(t *testing.T) {
Clock: clk,
Historian: &state.FakeHistorian{},
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
rule := models.AlertRuleGen()()
@@ -1442,6 +1468,7 @@ func TestStaleResultsHandler(t *testing.T) {
Clock: clock.New(),
Historian: &state.FakeHistorian{},
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
st.Warm(ctx, dbstore)
@@ -1523,6 +1550,7 @@ func TestStaleResults(t *testing.T) {
Clock: clk,
Historian: &state.FakeHistorian{},
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
@@ -1695,6 +1723,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
Clock: clk,
Historian: &state.FakeHistorian{},
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
st.Warm(ctx, dbstore)
@@ -1835,6 +1864,7 @@ func TestResetStateByRuleUID(t *testing.T) {
Clock: clk,
Historian: fakeHistorian,
MaxStateSaveConcurrency: 1,
Tracer: tracing.InitializeTracerForTest(),
}
st := state.NewManager(cfg)
st.Warm(ctx, dbstore)