mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Don't use a separate collection system for metrics (#75296)
* Alerting: Don't use a separate collection system for metrics The state package had a metric collection system that ran every 15s updating the values of the metrics - there is a common pattern for this in the Prometheus ecosystem called "collectors". I have removed the behaviour of using a time-based interval to "set" the metrics in favour of a set of functions as the "value" that get called at scrape time.
This commit is contained in:
parent
1600f75d7e
commit
59694fb2be
@ -6,18 +6,18 @@ import (
|
||||
)
|
||||
|
||||
type State struct {
|
||||
AlertState *prometheus.GaugeVec
|
||||
StateUpdateDuration prometheus.Histogram
|
||||
r prometheus.Registerer
|
||||
}
|
||||
|
||||
// Registerer exposes the Prometheus register directly. The state package needs this as, it uses a collector to fetch the current alerts by state in the system.
|
||||
func (s State) Registerer() prometheus.Registerer {
|
||||
return s.r
|
||||
}
|
||||
|
||||
func NewStateMetrics(r prometheus.Registerer) *State {
|
||||
return &State{
|
||||
AlertState: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: Namespace,
|
||||
Subsystem: Subsystem,
|
||||
Name: "alerts",
|
||||
Help: "How many alerts by state.",
|
||||
}, []string{"state"}),
|
||||
r: r,
|
||||
StateUpdateDuration: promauto.With(r).NewHistogram(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: Namespace,
|
||||
|
@ -323,10 +323,6 @@ func (ng *AlertNG) Run(ctx context.Context) error {
|
||||
|
||||
children, subCtx := errgroup.WithContext(ctx)
|
||||
|
||||
children.Go(func() error {
|
||||
return ng.stateManager.Run(subCtx)
|
||||
})
|
||||
|
||||
children.Go(func() error {
|
||||
return ng.MultiOrgAlertmanager.Run(subCtx)
|
||||
})
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
@ -33,6 +34,44 @@ func newCache() *cache {
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterMetrics registers a set of Gauges in the form of collectors for the alerts in the cache.
|
||||
func (c *cache) RegisterMetrics(r prometheus.Registerer) {
|
||||
newAlertCountByState := func(state eval.State) prometheus.GaugeFunc {
|
||||
return prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||
Namespace: metrics.Namespace,
|
||||
Subsystem: metrics.Subsystem,
|
||||
Name: "alerts",
|
||||
Help: "How many alerts by state.",
|
||||
ConstLabels: prometheus.Labels{"state": strings.ToLower(state.String())},
|
||||
}, func() float64 {
|
||||
return c.countAlertsBy(state)
|
||||
})
|
||||
}
|
||||
|
||||
r.MustRegister(newAlertCountByState(eval.Normal))
|
||||
r.MustRegister(newAlertCountByState(eval.Alerting))
|
||||
r.MustRegister(newAlertCountByState(eval.Pending))
|
||||
r.MustRegister(newAlertCountByState(eval.Error))
|
||||
r.MustRegister(newAlertCountByState(eval.NoData))
|
||||
}
|
||||
|
||||
func (c *cache) countAlertsBy(state eval.State) float64 {
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
var count float64
|
||||
for _, orgMap := range c.states {
|
||||
for _, rule := range orgMap {
|
||||
for _, st := range rule.states {
|
||||
if st.State == state {
|
||||
count++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
func (c *cache) getOrCreate(ctx context.Context, log log.Logger, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, externalURL *url.URL) *State {
|
||||
// Calculation of state ID involves label and annotation expansion, which may be resource intensive operations, and doing it in the context guarded by mtxStates may create a lot of contention.
|
||||
// Instead of just calculating ID we create an entire state - a candidate. If rule states already hold a state with this ID, this candidate will be discarded and the existing one will be returned.
|
||||
@ -290,34 +329,6 @@ func (c *cache) removeByRuleUID(orgID int64, uid string) []*State {
|
||||
return states
|
||||
}
|
||||
|
||||
func (c *cache) recordMetrics(metrics *metrics.State) {
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
|
||||
// Set default values to zero such that gauges are reset
|
||||
// after all values from a single state disappear.
|
||||
ct := map[eval.State]int{
|
||||
eval.Normal: 0,
|
||||
eval.Alerting: 0,
|
||||
eval.Pending: 0,
|
||||
eval.NoData: 0,
|
||||
eval.Error: 0,
|
||||
}
|
||||
|
||||
for _, orgMap := range c.states {
|
||||
for _, rule := range orgMap {
|
||||
for _, state := range rule.states {
|
||||
n := ct[state.State]
|
||||
ct[state.State] = n + 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for k, n := range ct {
|
||||
metrics.AlertState.WithLabelValues(strings.ToLower(k.String())).Set(float64(n))
|
||||
}
|
||||
}
|
||||
|
||||
// if duplicate labels exist, keep the value from the first set
|
||||
func mergeLabels(a, b data.Labels) data.Labels {
|
||||
newLbs := make(data.Labels, len(a)+len(b))
|
||||
|
@ -69,8 +69,14 @@ type ManagerCfg struct {
|
||||
}
|
||||
|
||||
func NewManager(cfg ManagerCfg) *Manager {
|
||||
return &Manager{
|
||||
cache: newCache(),
|
||||
// Metrics for the cache use a collector, so they need access to the register directly.
|
||||
c := newCache()
|
||||
if cfg.Metrics != nil {
|
||||
c.RegisterMetrics(cfg.Metrics.Registerer())
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
cache: c,
|
||||
ResendDelay: ResendDelay, // TODO: make this configurable
|
||||
log: cfg.Log,
|
||||
metrics: cfg.Metrics,
|
||||
@ -84,24 +90,12 @@ func NewManager(cfg ManagerCfg) *Manager {
|
||||
applyNoDataAndErrorToAllStates: cfg.ApplyNoDataAndErrorToAllStates,
|
||||
tracer: cfg.Tracer,
|
||||
}
|
||||
}
|
||||
|
||||
func (st *Manager) Run(ctx context.Context) error {
|
||||
if st.applyNoDataAndErrorToAllStates {
|
||||
st.log.Info("Running in alternative execution of Error/NoData mode")
|
||||
}
|
||||
ticker := st.clock.Ticker(MetricsScrapeInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
st.log.Debug("Recording state cache metrics", "now", st.clock.Now())
|
||||
st.cache.recordMetrics(st.metrics)
|
||||
case <-ctx.Done():
|
||||
st.log.Debug("Stopping")
|
||||
ticker.Stop()
|
||||
return ctx.Err()
|
||||
}
|
||||
if m.applyNoDataAndErrorToAllStates {
|
||||
m.log.Info("Running in alternative execution of Error/NoData mode")
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (st *Manager) Warm(ctx context.Context, rulesReader RuleReader) {
|
||||
|
@ -28,8 +28,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics()
|
||||
|
||||
// Not for parallel tests.
|
||||
type CountingImageService struct {
|
||||
Called int
|
||||
@ -311,6 +309,7 @@ func TestProcessEvalResults_StateTransitions(t *testing.T) {
|
||||
executeTest := func(t *testing.T, alertRule *ngmodels.AlertRule, resultsAtTime map[time.Time]eval.Results, expectedTransitionsAtTime map[time.Time][]StateTransition, applyNoDataErrorToAllStates bool) {
|
||||
clk := clock.NewMock()
|
||||
|
||||
testMetrics := metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics()
|
||||
cfg := ManagerCfg{
|
||||
Metrics: testMetrics,
|
||||
Tracer: tracing.InitializeTracerForTest(),
|
||||
|
@ -36,8 +36,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
var testMetrics = metrics.NewNGAlert(prometheus.NewPedanticRegistry())
|
||||
|
||||
func TestWarmStateCache(t *testing.T) {
|
||||
evaluationTime, err := time.Parse("2006-01-02", "2021-03-25")
|
||||
require.NoError(t, err)
|
||||
@ -195,7 +193,7 @@ func TestWarmStateCache(t *testing.T) {
|
||||
}
|
||||
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
@ -229,11 +227,11 @@ func TestDashboardAnnotations(t *testing.T) {
|
||||
_, dbstore := tests.SetupTestEnv(t, 1)
|
||||
|
||||
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, metrics)
|
||||
hist := historian.NewAnnotationBackend(store, nil, metrics)
|
||||
historianMetrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, historianMetrics)
|
||||
hist := historian.NewAnnotationBackend(store, nil, historianMetrics)
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
@ -1208,9 +1206,9 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
|
||||
reg := prometheus.NewPedanticRegistry()
|
||||
stateMetrics := metrics.NewStateMetrics(reg)
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, metrics)
|
||||
hist := historian.NewAnnotationBackend(store, nil, metrics)
|
||||
m := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
store := historian.NewAnnotationStore(fakeAnnoRepo, &dashboards.FakeDashboardService{}, m)
|
||||
hist := historian.NewAnnotationBackend(store, nil, m)
|
||||
clk := clock.NewMock()
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: stateMetrics,
|
||||
@ -1314,7 +1312,7 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
instanceStore := &state.FakeInstanceStore{}
|
||||
clk := clock.New()
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: instanceStore,
|
||||
Images: &state.NotAvailableImageService{},
|
||||
@ -1466,7 +1464,7 @@ func TestStaleResultsHandler(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
ctx := context.Background()
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
@ -1549,7 +1547,7 @@ func TestStaleResults(t *testing.T) {
|
||||
store := &state.FakeInstanceStore{}
|
||||
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: store,
|
||||
Images: &state.NoopImageService{},
|
||||
@ -1723,7 +1721,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
clk.Set(time.Now())
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
@ -1865,7 +1863,7 @@ func TestResetStateByRuleUID(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
clk.Set(time.Now())
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
Metrics: metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
|
Loading…
Reference in New Issue
Block a user