mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Support concurrent queries for saving alert instances (#70525)
This commit adds support for concurrent queries when saving alert instances to the database. This is an experimental feature in response to some customers experiencing delays between rule evaluation and sending alerts to Alertmanager, resulting in flapping. It is disabled by default.
This commit is contained in:
@@ -79,12 +79,13 @@ func (srv TestingApiSrv) RouteTestGrafanaRuleConfig(c *contextmodel.ReqContext,
|
||||
}
|
||||
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: nil,
|
||||
ExternalURL: srv.appUrl,
|
||||
InstanceStore: nil,
|
||||
Images: &backtesting.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: nil,
|
||||
Metrics: nil,
|
||||
ExternalURL: srv.appUrl,
|
||||
InstanceStore: nil,
|
||||
Images: &backtesting.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: nil,
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
manager := state.NewManager(cfg)
|
||||
includeFolder := !srv.cfg.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel)
|
||||
|
||||
@@ -45,12 +45,13 @@ func NewEngine(appUrl *url.URL, evalFactory eval.EvaluatorFactory) *Engine {
|
||||
evalFactory: evalFactory,
|
||||
createStateManager: func() stateManager {
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: nil,
|
||||
ExternalURL: appUrl,
|
||||
InstanceStore: nil,
|
||||
Images: &NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: nil,
|
||||
Metrics: nil,
|
||||
ExternalURL: appUrl,
|
||||
InstanceStore: nil,
|
||||
Images: &NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: nil,
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
return state.NewManager(cfg)
|
||||
},
|
||||
|
||||
@@ -212,13 +212,14 @@ func (ng *AlertNG) init() error {
|
||||
return err
|
||||
}
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: ng.Metrics.GetStateMetrics(),
|
||||
ExternalURL: appUrl,
|
||||
InstanceStore: ng.store,
|
||||
Images: ng.imageService,
|
||||
Clock: clk,
|
||||
Historian: history,
|
||||
DoNotSaveNormalState: ng.FeatureToggles.IsEnabled(featuremgmt.FlagAlertingNoNormalState),
|
||||
Metrics: ng.Metrics.GetStateMetrics(),
|
||||
ExternalURL: appUrl,
|
||||
InstanceStore: ng.store,
|
||||
Images: ng.imageService,
|
||||
Clock: clk,
|
||||
Historian: history,
|
||||
DoNotSaveNormalState: ng.FeatureToggles.IsEnabled(featuremgmt.FlagAlertingNoNormalState),
|
||||
MaxStateSaveConcurrency: ng.Cfg.UnifiedAlerting.MaxStateSaveConcurrency,
|
||||
}
|
||||
stateManager := state.NewManager(cfg)
|
||||
scheduler := schedule.NewScheduler(schedCfg, stateManager)
|
||||
|
||||
@@ -75,12 +75,13 @@ func TestProcessTicks(t *testing.T) {
|
||||
Tracer: testTracer,
|
||||
}
|
||||
managerCfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: nil,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: mockedClock,
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: nil,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: mockedClock,
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(managerCfg)
|
||||
|
||||
@@ -818,12 +819,13 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
Tracer: testTracer,
|
||||
}
|
||||
managerCfg := state.ManagerCfg{
|
||||
Metrics: m.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: is,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: mockedClock,
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: m.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: is,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: mockedClock,
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(managerCfg)
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/dskit/concurrency"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@@ -39,7 +40,8 @@ type Manager struct {
|
||||
historian Historian
|
||||
externalURL *url.URL
|
||||
|
||||
doNotSaveNormalState bool
|
||||
doNotSaveNormalState bool
|
||||
maxStateSaveConcurrency int
|
||||
}
|
||||
|
||||
type ManagerCfg struct {
|
||||
@@ -51,20 +53,23 @@ type ManagerCfg struct {
|
||||
Historian Historian
|
||||
// DoNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods
|
||||
DoNotSaveNormalState bool
|
||||
// MaxStateSaveConcurrency controls the number of goroutines (per rule) that can save alert state in parallel.
|
||||
MaxStateSaveConcurrency int
|
||||
}
|
||||
|
||||
func NewManager(cfg ManagerCfg) *Manager {
|
||||
return &Manager{
|
||||
cache: newCache(),
|
||||
ResendDelay: ResendDelay, // TODO: make this configurable
|
||||
log: log.New("ngalert.state.manager"),
|
||||
metrics: cfg.Metrics,
|
||||
instanceStore: cfg.InstanceStore,
|
||||
images: cfg.Images,
|
||||
historian: cfg.Historian,
|
||||
clock: cfg.Clock,
|
||||
externalURL: cfg.ExternalURL,
|
||||
doNotSaveNormalState: cfg.DoNotSaveNormalState,
|
||||
cache: newCache(),
|
||||
ResendDelay: ResendDelay, // TODO: make this configurable
|
||||
log: log.New("ngalert.state.manager"),
|
||||
metrics: cfg.Metrics,
|
||||
instanceStore: cfg.InstanceStore,
|
||||
images: cfg.Images,
|
||||
historian: cfg.Historian,
|
||||
clock: cfg.Clock,
|
||||
externalURL: cfg.ExternalURL,
|
||||
doNotSaveNormalState: cfg.DoNotSaveNormalState,
|
||||
maxStateSaveConcurrency: cfg.MaxStateSaveConcurrency,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -350,17 +355,17 @@ func (st *Manager) saveAlertStates(ctx context.Context, logger log.Logger, state
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("Saving alert states", "count", len(states))
|
||||
for _, s := range states {
|
||||
saveState := func(ctx context.Context, idx int) error {
|
||||
s := states[idx]
|
||||
// Do not save normal state to database and remove transition to Normal state but keep mapped states
|
||||
if st.doNotSaveNormalState && IsNormalStateWithNoReason(s.State) && !s.Changed() {
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
|
||||
key, err := s.GetAlertInstanceKey()
|
||||
if err != nil {
|
||||
logger.Error("Failed to create a key for alert state to save it to database. The state will be ignored ", "cacheID", s.CacheID, "error", err, "labels", s.Labels.String())
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
instance := ngModels.AlertInstance{
|
||||
AlertInstanceKey: key,
|
||||
@@ -375,9 +380,14 @@ func (st *Manager) saveAlertStates(ctx context.Context, logger log.Logger, state
|
||||
err = st.instanceStore.SaveAlertInstance(ctx, instance)
|
||||
if err != nil {
|
||||
logger.Error("Failed to save alert state", "labels", s.Labels.String(), "state", s.State, "error", err)
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
logger.Debug("Saving alert states done", "count", len(states))
|
||||
|
||||
logger.Debug("Saving alert states", "count", len(states), "max_state_save_concurrency", st.maxStateSaveConcurrency)
|
||||
_ = concurrency.ForEachJob(ctx, len(states), st.maxStateSaveConcurrency, saveState)
|
||||
logger.Debug("Saving alert states done", "count", len(states), "max_state_save_concurrency", st.maxStateSaveConcurrency)
|
||||
}
|
||||
|
||||
func (st *Manager) deleteAlertStates(ctx context.Context, logger log.Logger, states []StateTransition) {
|
||||
|
||||
@@ -22,7 +22,8 @@ func BenchmarkProcessEvalResults(b *testing.B) {
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
hist := historian.NewAnnotationBackend(&as, nil, nil, metrics)
|
||||
cfg := state.ManagerCfg{
|
||||
Historian: hist,
|
||||
Historian: hist,
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
sut := state.NewManager(cfg)
|
||||
now := time.Now().UTC()
|
||||
|
||||
@@ -114,7 +114,7 @@ func TestManager_saveAlertStates(t *testing.T) {
|
||||
|
||||
t.Run("should save all transitions if doNotSaveNormalState is false", func(t *testing.T) {
|
||||
st := &FakeInstanceStore{}
|
||||
m := Manager{instanceStore: st, doNotSaveNormalState: false}
|
||||
m := Manager{instanceStore: st, doNotSaveNormalState: false, maxStateSaveConcurrency: 1}
|
||||
m.saveAlertStates(context.Background(), &logtest.Fake{}, transitions...)
|
||||
|
||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||
@@ -131,7 +131,7 @@ func TestManager_saveAlertStates(t *testing.T) {
|
||||
|
||||
t.Run("should not save Normal->Normal if doNotSaveNormalState is true", func(t *testing.T) {
|
||||
st := &FakeInstanceStore{}
|
||||
m := Manager{instanceStore: st, doNotSaveNormalState: true}
|
||||
m := Manager{instanceStore: st, doNotSaveNormalState: true, maxStateSaveConcurrency: 1}
|
||||
m.saveAlertStates(context.Background(), &logtest.Fake{}, transitions...)
|
||||
|
||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||
|
||||
@@ -194,12 +194,13 @@ func TestWarmStateCache(t *testing.T) {
|
||||
}
|
||||
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.NewMock(),
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.NewMock(),
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
st.Warm(ctx, dbstore)
|
||||
@@ -227,12 +228,13 @@ func TestDashboardAnnotations(t *testing.T) {
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
hist := historian.NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{}, nil, metrics)
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: hist,
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: hist,
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
|
||||
@@ -2256,12 +2258,13 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry())
|
||||
hist := historian.NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{}, nil, metrics)
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: &state.FakeInstanceStore{},
|
||||
Images: &state.NotAvailableImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: hist,
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: &state.FakeInstanceStore{},
|
||||
Images: &state.NotAvailableImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: hist,
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
@@ -2291,12 +2294,13 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
instanceStore := &state.FakeInstanceStore{}
|
||||
clk := clock.New()
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: instanceStore,
|
||||
Images: &state.NotAvailableImageService{},
|
||||
Clock: clk,
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: instanceStore,
|
||||
Images: &state.NotAvailableImageService{},
|
||||
Clock: clk,
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
rule := models.AlertRuleGen()()
|
||||
@@ -2432,12 +2436,13 @@ func TestStaleResultsHandler(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
ctx := context.Background()
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
st.Warm(ctx, dbstore)
|
||||
@@ -2511,12 +2516,13 @@ func TestStaleResults(t *testing.T) {
|
||||
store := &state.FakeInstanceStore{}
|
||||
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: store,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clk,
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: store,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clk,
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
|
||||
@@ -2678,12 +2684,13 @@ func TestDeleteStateByRuleUID(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
clk.Set(time.Now())
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clk,
|
||||
Historian: &state.FakeHistorian{},
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clk,
|
||||
Historian: &state.FakeHistorian{},
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
st.Warm(ctx, dbstore)
|
||||
@@ -2817,12 +2824,13 @@ func TestResetStateByRuleUID(t *testing.T) {
|
||||
clk := clock.NewMock()
|
||||
clk.Set(time.Now())
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clk,
|
||||
Historian: fakeHistorian,
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clk,
|
||||
Historian: fakeHistorian,
|
||||
MaxStateSaveConcurrency: 1,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
st.Warm(ctx, dbstore)
|
||||
|
||||
Reference in New Issue
Block a user