mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: make StatePersister more configurable to support custom rule-level state persisters (#94590)
This commit is contained in:
parent
6787e2f108
commit
9d182986f1
@ -346,8 +346,8 @@ func (c *cache) removeByRuleUID(orgID int64, uid string) []*State {
|
|||||||
return states
|
return states
|
||||||
}
|
}
|
||||||
|
|
||||||
// asInstances returns the whole content of the cache as a slice of AlertInstance.
|
// GetAlertInstances returns the whole content of the cache as a slice of AlertInstance.
|
||||||
func (c *cache) asInstances(skipNormalState bool) []ngModels.AlertInstance {
|
func (c *cache) GetAlertInstances(skipNormalState bool) []ngModels.AlertInstance {
|
||||||
var states []ngModels.AlertInstance
|
var states []ngModels.AlertInstance
|
||||||
c.mtxStates.RLock()
|
c.mtxStates.RLock()
|
||||||
defer c.mtxStates.RUnlock()
|
defer c.mtxStates.RUnlock()
|
||||||
|
@ -31,8 +31,8 @@ type AlertInstanceManager interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type StatePersister interface {
|
type StatePersister interface {
|
||||||
Async(ctx context.Context, cache *cache)
|
Async(ctx context.Context, instancesProvider AlertInstancesProvider)
|
||||||
Sync(ctx context.Context, span trace.Span, states StateTransitions)
|
Sync(ctx context.Context, span trace.Span, ruleKey ngModels.AlertRuleKeyWithGroup, states StateTransitions)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sender is an optional callback intended for sending the states to an alertmanager.
|
// Sender is an optional callback intended for sending the states to an alertmanager.
|
||||||
@ -347,7 +347,7 @@ func (st *Manager) ProcessEvalResults(
|
|||||||
statesToSend = st.updateLastSentAt(allChanges, evaluatedAt)
|
statesToSend = st.updateLastSentAt(allChanges, evaluatedAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
st.persister.Sync(ctx, span, allChanges)
|
st.persister.Sync(ctx, span, alertRule.GetKeyWithGroup(), allChanges)
|
||||||
if st.historian != nil {
|
if st.historian != nil {
|
||||||
st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges)
|
st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges)
|
||||||
}
|
}
|
||||||
|
@ -9,8 +9,13 @@ import (
|
|||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type AlertInstancesProvider interface {
|
||||||
|
GetAlertInstances(skipNormalState bool) []models.AlertInstance
|
||||||
|
}
|
||||||
|
|
||||||
type AsyncStatePersister struct {
|
type AsyncStatePersister struct {
|
||||||
log log.Logger
|
log log.Logger
|
||||||
// doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods.
|
// doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods.
|
||||||
@ -30,16 +35,16 @@ func NewAsyncStatePersister(log log.Logger, ticker *clock.Ticker, cfg ManagerCfg
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AsyncStatePersister) Async(ctx context.Context, cache *cache) {
|
func (a *AsyncStatePersister) Async(ctx context.Context, instancesProvider AlertInstancesProvider) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-a.ticker.C:
|
case <-a.ticker.C:
|
||||||
if err := a.fullSync(ctx, cache); err != nil {
|
if err := a.fullSync(ctx, instancesProvider); err != nil {
|
||||||
a.log.Error("Failed to do a full state sync to database", "err", err)
|
a.log.Error("Failed to do a full state sync to database", "err", err)
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
a.log.Info("Scheduler is shutting down, doing a final state sync.")
|
a.log.Info("Scheduler is shutting down, doing a final state sync.")
|
||||||
if err := a.fullSync(context.Background(), cache); err != nil {
|
if err := a.fullSync(context.Background(), instancesProvider); err != nil {
|
||||||
a.log.Error("Failed to do a full state sync to database", "err", err)
|
a.log.Error("Failed to do a full state sync to database", "err", err)
|
||||||
}
|
}
|
||||||
a.ticker.Stop()
|
a.ticker.Stop()
|
||||||
@ -49,10 +54,10 @@ func (a *AsyncStatePersister) Async(ctx context.Context, cache *cache) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error {
|
func (a *AsyncStatePersister) fullSync(ctx context.Context, instancesProvider AlertInstancesProvider) error {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
a.log.Debug("Full state sync start")
|
a.log.Debug("Full state sync start")
|
||||||
instances := cache.asInstances(a.doNotSaveNormalState)
|
instances := instancesProvider.GetAlertInstances(a.doNotSaveNormalState)
|
||||||
if err := a.store.FullSync(ctx, instances); err != nil {
|
if err := a.store.FullSync(ctx, instances); err != nil {
|
||||||
a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances))
|
a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances))
|
||||||
return err
|
return err
|
||||||
@ -64,6 +69,6 @@ func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) {
|
func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _ models.AlertRuleKeyWithGroup, _ StateTransitions) {
|
||||||
a.log.Debug("Sync: No-Op")
|
a.log.Debug("Sync: No-Op")
|
||||||
}
|
}
|
||||||
|
@ -4,12 +4,15 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NoopPersister struct{}
|
type NoopPersister struct{}
|
||||||
|
|
||||||
func (n *NoopPersister) Async(_ context.Context, _ *cache) {}
|
func (n *NoopPersister) Async(_ context.Context, _ AlertInstancesProvider) {}
|
||||||
func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _ StateTransitions) {}
|
func (n *NoopPersister) Sync(_ context.Context, _ trace.Span, _ models.AlertRuleKeyWithGroup, _ StateTransitions) {
|
||||||
|
}
|
||||||
|
|
||||||
func NewNoopPersister() StatePersister {
|
func NewNoopPersister() StatePersister {
|
||||||
return &NoopPersister{}
|
return &NoopPersister{}
|
||||||
|
@ -30,12 +30,12 @@ func NewSyncStatePersisiter(log log.Logger, cfg ManagerCfg) StatePersister {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *SyncStatePersister) Async(_ context.Context, _ *cache) {
|
func (a *SyncStatePersister) Async(_ context.Context, _ AlertInstancesProvider) {
|
||||||
a.log.Debug("Async: No-Op")
|
a.log.Debug("Async: No-Op")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync persists the state transitions to the database. It deletes stale states and saves the current states.
|
// Sync persists the state transitions to the database. It deletes stale states and saves the current states.
|
||||||
func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, allStates StateTransitions) {
|
func (a *SyncStatePersister) Sync(ctx context.Context, span trace.Span, _ ngModels.AlertRuleKeyWithGroup, allStates StateTransitions) {
|
||||||
staleStates := allStates.StaleStates()
|
staleStates := allStates.StaleStates()
|
||||||
if len(staleStates) > 0 {
|
if len(staleStates) > 0 {
|
||||||
a.deleteAlertStates(ctx, staleStates)
|
a.deleteAlertStates(ctx, staleStates)
|
||||||
|
@ -40,6 +40,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
|
|||||||
create(eval.NoData, ""),
|
create(eval.NoData, ""),
|
||||||
create(eval.Error, ""),
|
create(eval.Error, ""),
|
||||||
}
|
}
|
||||||
|
ruleKey := ngmodels.AlertRuleKeyWithGroup{}
|
||||||
|
|
||||||
transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{}
|
transitionToKey := map[ngmodels.AlertInstanceKey]StateTransition{}
|
||||||
transitions := make([]StateTransition, 0)
|
transitions := make([]StateTransition, 0)
|
||||||
@ -69,7 +70,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
|
|||||||
InstanceStore: st,
|
InstanceStore: st,
|
||||||
MaxStateSaveConcurrency: 1,
|
MaxStateSaveConcurrency: 1,
|
||||||
})
|
})
|
||||||
syncStatePersister.Sync(context.Background(), span, transitions)
|
syncStatePersister.Sync(context.Background(), span, ruleKey, transitions)
|
||||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||||
for _, op := range st.RecordedOps() {
|
for _, op := range st.RecordedOps() {
|
||||||
saved := op.(ngmodels.AlertInstance)
|
saved := op.(ngmodels.AlertInstance)
|
||||||
@ -90,7 +91,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
|
|||||||
InstanceStore: st,
|
InstanceStore: st,
|
||||||
MaxStateSaveConcurrency: 1,
|
MaxStateSaveConcurrency: 1,
|
||||||
})
|
})
|
||||||
syncStatePersister.Sync(context.Background(), span, transitions)
|
syncStatePersister.Sync(context.Background(), span, ruleKey, transitions)
|
||||||
|
|
||||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||||
for _, op := range st.RecordedOps() {
|
for _, op := range st.RecordedOps() {
|
||||||
@ -160,7 +161,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
|
|||||||
PreviousStateReason: util.GenerateShortUID(),
|
PreviousStateReason: util.GenerateShortUID(),
|
||||||
}
|
}
|
||||||
|
|
||||||
syncStatePersister.Sync(context.Background(), span, []StateTransition{transition})
|
syncStatePersister.Sync(context.Background(), span, ruleKey, []StateTransition{transition})
|
||||||
|
|
||||||
require.Len(t, st.RecordedOps(), 1)
|
require.Len(t, st.RecordedOps(), 1)
|
||||||
saved := st.RecordedOps()[0].(ngmodels.AlertInstance)
|
saved := st.RecordedOps()[0].(ngmodels.AlertInstance)
|
||||||
|
Loading…
Reference in New Issue
Block a user