mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
feat(alerting): add async state persister (#80763)
This commit is contained in:
parent
a7e9408433
commit
eb7e1216a1
@ -330,6 +330,37 @@ func (c *cache) removeByRuleUID(orgID int64, uid string) []*State {
|
||||
return states
|
||||
}
|
||||
|
||||
// asInstances returns the whole content of the cache as a slice of AlertInstance.
|
||||
func (c *cache) asInstances(skipNormalState bool) []ngModels.AlertInstance {
|
||||
var states []ngModels.AlertInstance
|
||||
c.mtxStates.RLock()
|
||||
defer c.mtxStates.RUnlock()
|
||||
for _, orgStates := range c.states {
|
||||
for _, v1 := range orgStates {
|
||||
for _, v2 := range v1.states {
|
||||
if skipNormalState && IsNormalStateWithNoReason(v2) {
|
||||
continue
|
||||
}
|
||||
key, err := v2.GetAlertInstanceKey()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
states = append(states, ngModels.AlertInstance{
|
||||
AlertInstanceKey: key,
|
||||
Labels: ngModels.InstanceLabels(v2.Labels),
|
||||
CurrentState: ngModels.InstanceStateType(v2.State.String()),
|
||||
CurrentReason: v2.StateReason,
|
||||
LastEvalTime: v2.LastEvaluationTime,
|
||||
CurrentStateSince: v2.StartsAt,
|
||||
CurrentStateEnd: v2.EndsAt,
|
||||
ResultFingerprint: v2.ResultFingerprint.String(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return states
|
||||
}
|
||||
|
||||
// if duplicate labels exist, keep the value from the first set
|
||||
func mergeLabels(a, b data.Labels) data.Labels {
|
||||
newLbs := make(data.Labels, len(a)+len(b))
|
||||
|
@ -14,6 +14,7 @@ type InstanceStore interface {
|
||||
SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error
|
||||
DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error
|
||||
DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error
|
||||
FullSync(ctx context.Context, instances []models.AlertInstance) error
|
||||
}
|
||||
|
||||
// RuleReader represents the ability to fetch alert rules.
|
||||
|
61
pkg/services/ngalert/state/persister_async.go
Normal file
61
pkg/services/ngalert/state/persister_async.go
Normal file
@ -0,0 +1,61 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
)
|
||||
|
||||
type AsyncStatePersister struct {
|
||||
log log.Logger
|
||||
// doNotSaveNormalState controls whether eval.Normal state is persisted to the database and returned by get methods.
|
||||
doNotSaveNormalState bool
|
||||
store InstanceStore
|
||||
}
|
||||
|
||||
func NewAsyncStatePersister(log log.Logger, cfg ManagerCfg) StatePersister {
|
||||
return &AsyncStatePersister{
|
||||
log: log,
|
||||
store: cfg.InstanceStore,
|
||||
doNotSaveNormalState: cfg.DoNotSaveNormalState,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *AsyncStatePersister) Async(ctx context.Context, ticker *clock.Ticker, cache *cache) {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if err := a.fullSync(ctx, cache); err != nil {
|
||||
a.log.Error("Failed to do a full state sync to database", "err", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
a.log.Info("Scheduler is shutting down, doing a final state sync.")
|
||||
if err := a.fullSync(context.Background(), cache); err != nil {
|
||||
a.log.Error("Failed to do a full state sync to database", "err", err)
|
||||
}
|
||||
ticker.Stop()
|
||||
a.log.Info("State async worker is shut down.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *AsyncStatePersister) fullSync(ctx context.Context, cache *cache) error {
|
||||
startTime := time.Now()
|
||||
a.log.Info("Full state sync start")
|
||||
instances := cache.asInstances(a.doNotSaveNormalState)
|
||||
if err := a.store.FullSync(ctx, instances); err != nil {
|
||||
a.log.Error("Full state sync failed", "duration", time.Since(startTime), "instances", len(instances))
|
||||
return err
|
||||
}
|
||||
a.log.Info("Full state sync done", "duration", time.Since(startTime), "instances", len(instances))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AsyncStatePersister) Sync(_ context.Context, _ trace.Span, _, _ []StateTransition) {
|
||||
a.log.Debug("Sync: No-Op")
|
||||
}
|
81
pkg/services/ngalert/state/persister_async_test.go
Normal file
81
pkg/services/ngalert/state/persister_async_test.go
Normal file
@ -0,0 +1,81 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
)
|
||||
|
||||
func TestAsyncStatePersister_Async(t *testing.T) {
|
||||
t.Run("It should save on tick", func(t *testing.T) {
|
||||
mockClock := clock.NewMock()
|
||||
store := &FakeInstanceStore{}
|
||||
logger := log.New("async.test")
|
||||
|
||||
persister := NewAsyncStatePersister(logger, ManagerCfg{
|
||||
InstanceStore: store,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
ticker := mockClock.Ticker(1 * time.Second)
|
||||
|
||||
cache := newCache()
|
||||
|
||||
go persister.Async(ctx, ticker, cache)
|
||||
|
||||
cache.set(&State{
|
||||
OrgID: 1,
|
||||
State: eval.Alerting,
|
||||
AlertRuleUID: "1",
|
||||
})
|
||||
// Let one tick pass
|
||||
mockClock.Add(1 * time.Second)
|
||||
|
||||
// Check if the state was saved
|
||||
require.Eventually(t, func() bool {
|
||||
return len(store.RecordedOps) == 1
|
||||
}, time.Second*5, time.Second)
|
||||
})
|
||||
t.Run("It should save on context done", func(t *testing.T) {
|
||||
mockClock := clock.NewMock()
|
||||
store := &FakeInstanceStore{}
|
||||
logger := log.New("async.test")
|
||||
|
||||
persister := NewAsyncStatePersister(logger, ManagerCfg{
|
||||
InstanceStore: store,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
ticker := mockClock.Ticker(1 * time.Second)
|
||||
|
||||
cache := newCache()
|
||||
|
||||
go persister.Async(ctx, ticker, cache)
|
||||
|
||||
cache.set(&State{
|
||||
OrgID: 1,
|
||||
State: eval.Alerting,
|
||||
AlertRuleUID: "1",
|
||||
})
|
||||
|
||||
// Now we cancel the context
|
||||
cancel()
|
||||
|
||||
// Check if the context cancellation was handled correctly
|
||||
require.Eventually(t, func() bool {
|
||||
return len(store.RecordedOps) == 1
|
||||
}, time.Second*5, time.Second)
|
||||
})
|
||||
}
|
@ -53,6 +53,16 @@ func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) FullSync(ctx context.Context, instances []models.AlertInstance) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = []any{}
|
||||
for _, instance := range instances {
|
||||
f.RecordedOps = append(f.RecordedOps, instance)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type FakeRuleReader struct{}
|
||||
|
||||
func (f *FakeRuleReader) ListAlertRules(_ context.Context, q *models.ListAlertRulesQuery) (models.RulesGroup, error) {
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
)
|
||||
|
||||
// ListAlertInstances is a handler for retrieving alert instances within specific organisation
|
||||
@ -197,3 +198,36 @@ func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.Ale
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (st DBstore) FullSync(ctx context.Context, instances []models.AlertInstance) error {
|
||||
if len(instances) == 0 {
|
||||
return nil
|
||||
}
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
// First we delete all records from the table
|
||||
if _, err := sess.Exec("DELETE FROM alert_instance"); err != nil {
|
||||
return fmt.Errorf("failed to delete alert_instance table: %w", err)
|
||||
}
|
||||
for _, alertInstance := range instances {
|
||||
if err := models.ValidateAlertInstance(alertInstance); err != nil {
|
||||
st.Logger.Warn("Failed to validate alert instance, skipping", "err", err, "rule_uid", alertInstance.RuleUID)
|
||||
continue
|
||||
}
|
||||
labelTupleJSON, err := alertInstance.Labels.StringKey()
|
||||
if err != nil {
|
||||
st.Logger.Warn("Failed to generate alert instance labels key, skipping", "err", err, "rule_uid", alertInstance.RuleUID)
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = sess.Exec("INSERT INTO alert_instance (rule_org_id, rule_uid, labels, labels_hash, current_state, current_reason, current_state_since, current_state_end, last_eval_time) VALUES (?,?,?,?,?,?,?,?,?)",
|
||||
alertInstance.RuleOrgID, alertInstance.RuleUID, labelTupleJSON, alertInstance.LabelsHash, alertInstance.CurrentState, alertInstance.CurrentReason, alertInstance.CurrentStateSince.Unix(), alertInstance.CurrentStateEnd.Unix(), alertInstance.LastEvalTime.Unix())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert into alert_instance table: %w", err)
|
||||
}
|
||||
}
|
||||
if err := sess.Commit(); err != nil {
|
||||
return fmt.Errorf("failed to commit alert_instance table: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@ -294,3 +295,97 @@ func TestIntegrationAlertInstanceOperations(t *testing.T) {
|
||||
require.Equal(t, instance2.CurrentState, alerts[0].CurrentState)
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegrationFullSync(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
_, dbstore := tests.SetupTestEnv(t, baseIntervalSeconds)
|
||||
|
||||
orgID := int64(1)
|
||||
|
||||
ruleUIDs := []string{"a", "b", "c", "d"}
|
||||
|
||||
instances := make([]models.AlertInstance, len(ruleUIDs))
|
||||
for i, ruleUID := range ruleUIDs {
|
||||
instances[i] = generateTestAlertInstance(orgID, ruleUID)
|
||||
}
|
||||
|
||||
t.Run("Should do a proper full sync", func(t *testing.T) {
|
||||
err := dbstore.FullSync(ctx, instances)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(instances))
|
||||
for _, ruleUID := range ruleUIDs {
|
||||
found := false
|
||||
for _, instance := range res {
|
||||
if instance.RuleUID == ruleUID {
|
||||
found = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Instance with RuleUID '%s' not found", ruleUID)
|
||||
}
|
||||
}
|
||||
})
|
||||
t.Run("Should remove non existing entries on sync", func(t *testing.T) {
|
||||
err := dbstore.FullSync(ctx, instances[1:])
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(instances)-1)
|
||||
for _, instance := range res {
|
||||
if instance.RuleUID == "a" {
|
||||
t.Error("Instance with RuleUID 'a' should not be exist anymore")
|
||||
}
|
||||
}
|
||||
})
|
||||
t.Run("Should add new entries on sync", func(t *testing.T) {
|
||||
newRuleUID := "y"
|
||||
err := dbstore.FullSync(ctx, append(instances, generateTestAlertInstance(orgID, newRuleUID)))
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := dbstore.ListAlertInstances(ctx, &models.ListAlertInstancesQuery{
|
||||
RuleOrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, len(instances)+1)
|
||||
for _, ruleUID := range append(ruleUIDs, newRuleUID) {
|
||||
found := false
|
||||
for _, instance := range res {
|
||||
if instance.RuleUID == ruleUID {
|
||||
found = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Instance with RuleUID '%s' not found", ruleUID)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func generateTestAlertInstance(orgID int64, ruleID string) models.AlertInstance {
|
||||
return models.AlertInstance{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: orgID,
|
||||
RuleUID: ruleID,
|
||||
LabelsHash: "abc",
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: map[string]string{
|
||||
"hello": "world",
|
||||
},
|
||||
ResultFingerprint: "abc",
|
||||
CurrentStateEnd: time.Now(),
|
||||
CurrentStateSince: time.Now(),
|
||||
LastEvalTime: time.Now(),
|
||||
CurrentReason: "abc",
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user