From 59694fb2be23d9f784606c1191d21b9631fd1a9e Mon Sep 17 00:00:00 2001 From: gotjosh Date: Mon, 25 Sep 2023 10:27:30 +0100 Subject: [PATCH] Alerting: Don't use a separate collection system for metrics (#75296) * Alerting: Don't use a separate collection system for metrics The state package had a metric collection system that ran every 15s updating the values of the metrics - there is a common pattern for this in the Prometheus ecosystem called "collectors". I have removed the behaviour of using a time-based interval to "set" the metrics in favour of a set of functions as the "value" that get called at scrape time. --- pkg/services/ngalert/metrics/state.go | 14 ++-- pkg/services/ngalert/ngalert.go | 4 -- pkg/services/ngalert/state/cache.go | 67 +++++++++++-------- pkg/services/ngalert/state/manager.go | 30 ++++----- .../ngalert/state/manager_private_test.go | 3 +- pkg/services/ngalert/state/manager_test.go | 28 ++++---- 6 files changed, 72 insertions(+), 74 deletions(-) diff --git a/pkg/services/ngalert/metrics/state.go b/pkg/services/ngalert/metrics/state.go index 1aae3d1e683..83b87f72d05 100644 --- a/pkg/services/ngalert/metrics/state.go +++ b/pkg/services/ngalert/metrics/state.go @@ -6,18 +6,18 @@ import ( ) type State struct { - AlertState *prometheus.GaugeVec StateUpdateDuration prometheus.Histogram + r prometheus.Registerer +} + +// Registerer exposes the Prometheus register directly. The state package needs this as, it uses a collector to fetch the current alerts by state in the system. +func (s State) Registerer() prometheus.Registerer { + return s.r } func NewStateMetrics(r prometheus.Registerer) *State { return &State{ - AlertState: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ - Namespace: Namespace, - Subsystem: Subsystem, - Name: "alerts", - Help: "How many alerts by state.", - }, []string{"state"}), + r: r, StateUpdateDuration: promauto.With(r).NewHistogram( prometheus.HistogramOpts{ Namespace: Namespace, diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index e8ec127948f..521106afd3a 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -323,10 +323,6 @@ func (ng *AlertNG) Run(ctx context.Context) error { children, subCtx := errgroup.WithContext(ctx) - children.Go(func() error { - return ng.stateManager.Run(subCtx) - }) - children.Go(func() error { return ng.MultiOrgAlertmanager.Run(subCtx) }) diff --git a/pkg/services/ngalert/state/cache.go b/pkg/services/ngalert/state/cache.go index bb4147b2ea3..008eb889278 100644 --- a/pkg/services/ngalert/state/cache.go +++ b/pkg/services/ngalert/state/cache.go @@ -10,6 +10,7 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/ngalert/eval" @@ -33,6 +34,44 @@ func newCache() *cache { } } +// RegisterMetrics registers a set of Gauges in the form of collectors for the alerts in the cache. +func (c *cache) RegisterMetrics(r prometheus.Registerer) { + newAlertCountByState := func(state eval.State) prometheus.GaugeFunc { + return prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.Subsystem, + Name: "alerts", + Help: "How many alerts by state.", + ConstLabels: prometheus.Labels{"state": strings.ToLower(state.String())}, + }, func() float64 { + return c.countAlertsBy(state) + }) + } + + r.MustRegister(newAlertCountByState(eval.Normal)) + r.MustRegister(newAlertCountByState(eval.Alerting)) + r.MustRegister(newAlertCountByState(eval.Pending)) + r.MustRegister(newAlertCountByState(eval.Error)) + r.MustRegister(newAlertCountByState(eval.NoData)) +} + +func (c *cache) countAlertsBy(state eval.State) float64 { + c.mtxStates.RLock() + defer c.mtxStates.RUnlock() + var count float64 + for _, orgMap := range c.states { + for _, rule := range orgMap { + for _, st := range rule.states { + if st.State == state { + count++ + } + } + } + } + + return count +} + func (c *cache) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) *State { // Calculation of state ID involves label and annotation expansion, which may be resource intensive operations, and doing it in the context guarded by mtxStates may create a lot of contention. // Instead of just calculating ID we create an entire state - a candidate. If rule states already hold a state with this ID, this candidate will be discarded and the existing one will be returned. @@ -290,34 +329,6 @@ func (c *cache) removeByRuleUID(orgID int64, uid string) []*State { return states } -func (c *cache) recordMetrics(metrics *metrics.State) { - c.mtxStates.RLock() - defer c.mtxStates.RUnlock() - - // Set default values to zero such that gauges are reset - // after all values from a single state disappear. - ct := map[eval.State]int{ - eval.Normal: 0, - eval.Alerting: 0, - eval.Pending: 0, - eval.NoData: 0, - eval.Error: 0, - } - - for _, orgMap := range c.states { - for _, rule := range orgMap { - for _, state := range rule.states { - n := ct[state.State] - ct[state.State] = n + 1 - } - } - } - - for k, n := range ct { - metrics.AlertState.WithLabelValues(strings.ToLower(k.String())).Set(float64(n)) - } -} - // if duplicate labels exist, keep the value from the first set func mergeLabels(a, b data.Labels) data.Labels { newLbs := make(data.Labels, len(a)+len(b)) diff --git a/pkg/services/ngalert/state/manager.go b/pkg/services/ngalert/state/manager.go index 865bacf014a..df722d3d10b 100644 --- a/pkg/services/ngalert/state/manager.go +++ b/pkg/services/ngalert/state/manager.go @@ -69,8 +69,14 @@ type ManagerCfg struct { } func NewManager(cfg ManagerCfg) *Manager { - return &Manager{ - cache: newCache(), + // Metrics for the cache use a collector, so they need access to the register directly. + c := newCache() + if cfg.Metrics != nil { + c.RegisterMetrics(cfg.Metrics.Registerer()) + } + + m := &Manager{ + cache: c, ResendDelay: ResendDelay, // TODO: make this configurable log: cfg.Log, metrics: cfg.Metrics, @@ -84,24 +90,12 @@ func NewManager(cfg ManagerCfg) *Manager { applyNoDataAndErrorToAllStates: cfg.ApplyNoDataAndErrorToAllStates, tracer: cfg.Tracer, } -} -func (st *Manager) Run(ctx context.Context) error { - if st.applyNoDataAndErrorToAllStates { - st.log.Info("Running in alternative execution of Error/NoData mode") - } - ticker := st.clock.Ticker(MetricsScrapeInterval) - for { - select { - case <-ticker.C: - st.log.Debug("Recording state cache metrics", "now", st.clock.Now()) - st.cache.recordMetrics(st.metrics) - case <-ctx.Done(): - st.log.Debug("Stopping") - ticker.Stop() - return ctx.Err() - } + if m.applyNoDataAndErrorToAllStates { + m.log.Info("Running in alternative execution of Error/NoData mode") } + + return m } func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) { diff --git a/pkg/services/ngalert/state/manager_private_test.go b/pkg/services/ngalert/state/manager_private_test.go index 867e223d6e6..9f7bba3bc1a 100644 --- a/pkg/services/ngalert/state/manager_private_test.go +++ b/pkg/services/ngalert/state/manager_private_test.go @@ -28,8 +28,6 @@ import ( "github.com/grafana/grafana/pkg/util" ) -var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics() - // Not for parallel tests. type CountingImageService struct { Called int @@ -311,6 +309,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) { executeTest := func(t *testing.T, alertRule *ngmodels.AlertRule, resultsAtTime map[time.Time]eval.Results, expectedTransitionsAtTime map[time.Time][]StateTransition, applyNoDataErrorToAllStates bool) { clk := clock.NewMock() + testMetrics := metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics() cfg := ManagerCfg{ Metrics: testMetrics, Tracer: tracing.InitializeTracerForTest(), diff --git a/pkg/services/ngalert/state/manager_test.go b/pkg/services/ngalert/state/manager_test.go index b461ecc7946..682af1bcb53 100644 --- a/pkg/services/ngalert/state/manager_test.go +++ b/pkg/services/ngalert/state/manager_test.go @@ -36,8 +36,6 @@ import ( "github.com/grafana/grafana/pkg/util" ) -var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry()) - func TestWarmStateCache(t *testing.T) { evaluationTime, err := time.Parse("2006-01-02", "2021-03-25") require.NoError(t, err) @@ -195,7 +193,7 @@ func TestWarmStateCache(t *testing.T) { } cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: dbstore, Images: &state.NoopImageService{}, @@ -229,11 +227,11 @@ func TestDashboardAnnotations(t *testing.T) { _, dbstore := tests.SetupTestEnv(t, 1) fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() - metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) - store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, metrics) - hist := historian.NewAnnotationBackend(store, nil, metrics) + historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, historianMetrics) + hist := historian.NewAnnotationBackend(store, nil, historianMetrics) cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: dbstore, Images: &state.NoopImageService{}, @@ -1208,9 +1206,9 @@ func TestProcessEvalResults(t *testing.T) { fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo() reg := prometheus.NewPedanticRegistry() stateMetrics := metrics.NewStateMetrics(reg) - metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry()) - store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, metrics) - hist := historian.NewAnnotationBackend(store, nil, metrics) + m := metrics.NewHistorianMetrics(prometheus.NewRegistry()) + store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, m) + hist := historian.NewAnnotationBackend(store, nil, m) clk := clock.NewMock() cfg := state.ManagerCfg{ Metrics: stateMetrics, @@ -1314,7 +1312,7 @@ func TestProcessEvalResults(t *testing.T) { instanceStore := &state.FakeInstanceStore{} clk := clock.New() cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: instanceStore, Images: &state.NotAvailableImageService{}, @@ -1466,7 +1464,7 @@ func TestStaleResultsHandler(t *testing.T) { for _, tc := range testCases { ctx := context.Background() cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: dbstore, Images: &state.NoopImageService{}, @@ -1549,7 +1547,7 @@ func TestStaleResults(t *testing.T) { store := &state.FakeInstanceStore{} cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: store, Images: &state.NoopImageService{}, @@ -1723,7 +1721,7 @@ func TestDeleteStateByRuleUID(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: dbstore, Images: &state.NoopImageService{}, @@ -1865,7 +1863,7 @@ func TestResetStateByRuleUID(t *testing.T) { clk := clock.NewMock() clk.Set(time.Now()) cfg := state.ManagerCfg{ - Metrics: testMetrics.GetStateMetrics(), + Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), ExternalURL: nil, InstanceStore: dbstore, Images: &state.NoopImageService{},