mirror of
				https://github.com/grafana/grafana.git
				synced 2025-02-25 18:55:37 -06:00 
			
		
		
		
	Alerting: Delete state from the database on reset (#53919)
* make ResetStatesByRuleUID return states * delete rule states when reset * rule eval routine to clean up the state only when rule is deleted
This commit is contained in:
		@@ -2,13 +2,17 @@ package schedule
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/grafana/grafana/pkg/services/ngalert/models"
 | 
			
		||||
	"github.com/grafana/grafana/pkg/util"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var errRuleDeleted = errors.New("rule deleted")
 | 
			
		||||
 | 
			
		||||
type alertRuleInfoRegistry struct {
 | 
			
		||||
	mu            sync.Mutex
 | 
			
		||||
	alertRuleInfo map[models.AlertRuleKey]*alertRuleInfo
 | 
			
		||||
@@ -78,12 +82,12 @@ type alertRuleInfo struct {
 | 
			
		||||
	evalCh   chan *evaluation
 | 
			
		||||
	updateCh chan ruleVersion
 | 
			
		||||
	ctx      context.Context
 | 
			
		||||
	stop     context.CancelFunc
 | 
			
		||||
	stop     func(reason error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
 | 
			
		||||
	ctx, cancel := context.WithCancel(parent)
 | 
			
		||||
	return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersion), ctx: ctx, stop: cancel}
 | 
			
		||||
	ctx, stop := util.WithCancelCause(parent)
 | 
			
		||||
	return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersion), ctx: ctx, stop: stop}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped.
 | 
			
		||||
 
 | 
			
		||||
@@ -153,7 +153,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
 | 
			
		||||
				resultCh <- evalResponse{result, dropped}
 | 
			
		||||
			}()
 | 
			
		||||
			runtime.Gosched()
 | 
			
		||||
			r.stop()
 | 
			
		||||
			r.stop(nil)
 | 
			
		||||
			select {
 | 
			
		||||
			case result := <-resultCh:
 | 
			
		||||
				require.False(t, result.success)
 | 
			
		||||
@@ -166,12 +166,13 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
 | 
			
		||||
	t.Run("when rule evaluation is stopped", func(t *testing.T) {
 | 
			
		||||
		t.Run("Update should do nothing", func(t *testing.T) {
 | 
			
		||||
			r := newAlertRuleInfo(context.Background())
 | 
			
		||||
			r.stop()
 | 
			
		||||
			r.stop(errRuleDeleted)
 | 
			
		||||
			require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
 | 
			
		||||
			require.False(t, r.update(ruleVersion(rand.Int63())))
 | 
			
		||||
		})
 | 
			
		||||
		t.Run("eval should do nothing", func(t *testing.T) {
 | 
			
		||||
			r := newAlertRuleInfo(context.Background())
 | 
			
		||||
			r.stop()
 | 
			
		||||
			r.stop(nil)
 | 
			
		||||
			rule := models.AlertRuleGen()()
 | 
			
		||||
			success, dropped := r.eval(time.Now(), rule)
 | 
			
		||||
			require.False(t, success)
 | 
			
		||||
@@ -179,8 +180,14 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
 | 
			
		||||
		})
 | 
			
		||||
		t.Run("stop should do nothing", func(t *testing.T) {
 | 
			
		||||
			r := newAlertRuleInfo(context.Background())
 | 
			
		||||
			r.stop()
 | 
			
		||||
			r.stop()
 | 
			
		||||
			r.stop(nil)
 | 
			
		||||
			r.stop(nil)
 | 
			
		||||
		})
 | 
			
		||||
		t.Run("stop should do nothing if parent context stopped", func(t *testing.T) {
 | 
			
		||||
			ctx, cancelFn := context.WithCancel(context.Background())
 | 
			
		||||
			r := newAlertRuleInfo(ctx)
 | 
			
		||||
			cancelFn()
 | 
			
		||||
			r.stop(nil)
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
	t.Run("should be thread-safe", func(t *testing.T) {
 | 
			
		||||
@@ -213,7 +220,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
 | 
			
		||||
					case 2:
 | 
			
		||||
						r.eval(time.Now(), models.AlertRuleGen()())
 | 
			
		||||
					case 3:
 | 
			
		||||
						r.stop()
 | 
			
		||||
						r.stop(nil)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				wg.Done()
 | 
			
		||||
 
 | 
			
		||||
@@ -2,6 +2,7 @@ package schedule
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"time"
 | 
			
		||||
@@ -168,7 +169,7 @@ func (sch *schedule) DeleteAlertRule(keys ...ngmodels.AlertRuleKey) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		// stop rule evaluation
 | 
			
		||||
		ruleInfo.stop()
 | 
			
		||||
		ruleInfo.stop(errRuleDeleted)
 | 
			
		||||
	}
 | 
			
		||||
	// Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick.
 | 
			
		||||
	alertRules := sch.schedulableAlertRules.all()
 | 
			
		||||
@@ -286,7 +287,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersion) error {
 | 
			
		||||
	logger := sch.log.New("uid", key.UID, "org", key.OrgID)
 | 
			
		||||
	logger := sch.log.New(key.LogContext()...)
 | 
			
		||||
	logger.Debug("alert rule routine started")
 | 
			
		||||
 | 
			
		||||
	orgID := fmt.Sprint(key.OrgID)
 | 
			
		||||
@@ -295,9 +296,8 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
 | 
			
		||||
	evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
 | 
			
		||||
 | 
			
		||||
	clearState := func() {
 | 
			
		||||
		states := sch.stateManager.GetStatesForRuleUID(key.OrgID, key.UID)
 | 
			
		||||
		states := sch.stateManager.ResetStateByRuleUID(grafanaCtx, key)
 | 
			
		||||
		expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
 | 
			
		||||
		sch.stateManager.RemoveByRuleUID(key.OrgID, key.UID)
 | 
			
		||||
		if len(expiredAlerts.PostableAlerts) > 0 {
 | 
			
		||||
			sch.alertsSender.Send(key, expiredAlerts)
 | 
			
		||||
		}
 | 
			
		||||
@@ -317,7 +317,10 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
 | 
			
		||||
		} else {
 | 
			
		||||
			logger.Debug("alert rule evaluated", "results", results, "duration", dur)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
 | 
			
		||||
			logger.Debug("skip updating the state because the context has been cancelled")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, extraLabels)
 | 
			
		||||
		alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
 | 
			
		||||
		if len(alerts.PostableAlerts) > 0 {
 | 
			
		||||
@@ -396,7 +399,10 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
 | 
			
		||||
				}
 | 
			
		||||
			}()
 | 
			
		||||
		case <-grafanaCtx.Done():
 | 
			
		||||
			clearState()
 | 
			
		||||
			// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
 | 
			
		||||
			if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
 | 
			
		||||
				clearState()
 | 
			
		||||
			}
 | 
			
		||||
			logger.Debug("stopping alert rule routine")
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -170,10 +170,15 @@ func TestSchedule_ruleRoutine(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.Run("should exit", func(t *testing.T) {
 | 
			
		||||
		t.Run("when context is cancelled", func(t *testing.T) {
 | 
			
		||||
		t.Run("and not clear the state if parent context is cancelled", func(t *testing.T) {
 | 
			
		||||
			stoppedChan := make(chan error)
 | 
			
		||||
			sch, _, _, _ := createSchedule(make(chan time.Time), nil)
 | 
			
		||||
 | 
			
		||||
			rule := models.AlertRuleGen()()
 | 
			
		||||
			_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen()), nil)
 | 
			
		||||
			expectedStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
 | 
			
		||||
			require.NotEmpty(t, expectedStates)
 | 
			
		||||
 | 
			
		||||
			ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
			go func() {
 | 
			
		||||
				err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersion))
 | 
			
		||||
@@ -183,6 +188,27 @@ func TestSchedule_ruleRoutine(t *testing.T) {
 | 
			
		||||
			cancel()
 | 
			
		||||
			err := waitForErrChannel(t, stoppedChan)
 | 
			
		||||
			require.NoError(t, err)
 | 
			
		||||
			require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)))
 | 
			
		||||
		})
 | 
			
		||||
		t.Run("and clean up the state if delete is cancellation reason ", func(t *testing.T) {
 | 
			
		||||
			stoppedChan := make(chan error)
 | 
			
		||||
			sch, _, _, _ := createSchedule(make(chan time.Time), nil)
 | 
			
		||||
 | 
			
		||||
			rule := models.AlertRuleGen()()
 | 
			
		||||
			_ = sch.stateManager.ProcessEvalResults(context.Background(), sch.clock.Now(), rule, eval.GenerateResults(rand.Intn(5)+1, eval.ResultGen()), nil)
 | 
			
		||||
			require.NotEmpty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
 | 
			
		||||
 | 
			
		||||
			ctx, cancel := util.WithCancelCause(context.Background())
 | 
			
		||||
			go func() {
 | 
			
		||||
				err := sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), make(chan ruleVersion))
 | 
			
		||||
				stoppedChan <- err
 | 
			
		||||
			}()
 | 
			
		||||
 | 
			
		||||
			cancel(errRuleDeleted)
 | 
			
		||||
			err := waitForErrChannel(t, stoppedChan)
 | 
			
		||||
			require.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
			require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
@@ -419,11 +445,11 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
 | 
			
		||||
				t.Fatal("No message was received on update channel")
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
		t.Run("should exit if it is closed", func(t *testing.T) {
 | 
			
		||||
		t.Run("should exit if rule is being stopped", func(t *testing.T) {
 | 
			
		||||
			sch := setupScheduler(t, nil, nil, nil, nil, nil)
 | 
			
		||||
			key := models.GenerateRuleKey(rand.Int63())
 | 
			
		||||
			info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
 | 
			
		||||
			info.stop()
 | 
			
		||||
			info.stop(nil)
 | 
			
		||||
			sch.UpdateAlertRule(key, rand.Int63())
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
@@ -444,23 +470,7 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
 | 
			
		||||
			key := rule.GetKey()
 | 
			
		||||
			info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
 | 
			
		||||
			sch.DeleteAlertRule(key)
 | 
			
		||||
			require.False(t, info.update(ruleVersion(rand.Int63())))
 | 
			
		||||
			success, dropped := info.eval(time.Now(), rule)
 | 
			
		||||
			require.False(t, success)
 | 
			
		||||
			require.Nilf(t, dropped, "expected no dropped evaluations but got one")
 | 
			
		||||
			require.False(t, sch.registry.exists(key))
 | 
			
		||||
		})
 | 
			
		||||
		t.Run("should remove controller from registry", func(t *testing.T) {
 | 
			
		||||
			sch := setupScheduler(t, nil, nil, nil, nil, nil)
 | 
			
		||||
			rule := models.AlertRuleGen()()
 | 
			
		||||
			key := rule.GetKey()
 | 
			
		||||
			info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
 | 
			
		||||
			info.stop()
 | 
			
		||||
			sch.DeleteAlertRule(key)
 | 
			
		||||
			require.False(t, info.update(ruleVersion(rand.Int63())))
 | 
			
		||||
			success, dropped := info.eval(time.Now(), rule)
 | 
			
		||||
			require.False(t, success)
 | 
			
		||||
			require.Nilf(t, dropped, "expected no dropped evaluations but got one")
 | 
			
		||||
			require.ErrorIs(t, info.ctx.Err(), errRuleDeleted)
 | 
			
		||||
			require.False(t, sch.registry.exists(key))
 | 
			
		||||
		})
 | 
			
		||||
	})
 | 
			
		||||
 
 | 
			
		||||
@@ -180,11 +180,20 @@ func (c *cache) getStatesForRuleUID(orgID int64, alertRuleUID string) []*State {
 | 
			
		||||
	return ruleStates
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// removeByRuleUID deletes all entries in the state cache that match the given UID.
 | 
			
		||||
func (c *cache) removeByRuleUID(orgID int64, uid string) {
 | 
			
		||||
// removeByRuleUID deletes all entries in the state cache that match the given UID. Returns removed states
 | 
			
		||||
func (c *cache) removeByRuleUID(orgID int64, uid string) []*State {
 | 
			
		||||
	c.mtxStates.Lock()
 | 
			
		||||
	defer c.mtxStates.Unlock()
 | 
			
		||||
	statesMap := c.states[orgID][uid]
 | 
			
		||||
	delete(c.states[orgID], uid)
 | 
			
		||||
	if statesMap == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	states := make([]*State, 0, len(statesMap))
 | 
			
		||||
	for _, state := range statesMap {
 | 
			
		||||
		states = append(states, state)
 | 
			
		||||
	}
 | 
			
		||||
	return states
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *cache) reset() {
 | 
			
		||||
 
 | 
			
		||||
@@ -73,7 +73,7 @@ func (st *Manager) Close(ctx context.Context) {
 | 
			
		||||
 | 
			
		||||
func (st *Manager) Warm(ctx context.Context) {
 | 
			
		||||
	st.log.Info("warming cache for startup")
 | 
			
		||||
	st.ResetCache()
 | 
			
		||||
	st.ResetAllStates()
 | 
			
		||||
 | 
			
		||||
	orgIds, err := st.instanceStore.FetchOrgIds(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -149,20 +149,30 @@ func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) (*State, error
 | 
			
		||||
	return st.cache.get(orgID, alertRuleUID, stateId)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ResetCache is used to ensure a clean cache on startup.
 | 
			
		||||
func (st *Manager) ResetCache() {
 | 
			
		||||
// ResetAllStates is used to ensure a clean cache on startup.
 | 
			
		||||
func (st *Manager) ResetAllStates() {
 | 
			
		||||
	st.cache.reset()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RemoveByRuleUID deletes all entries in the state manager that match the given rule UID.
 | 
			
		||||
func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
 | 
			
		||||
	st.cache.removeByRuleUID(orgID, ruleUID)
 | 
			
		||||
// ResetStateByRuleUID deletes all entries in the state manager that match the given rule UID.
 | 
			
		||||
func (st *Manager) ResetStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State {
 | 
			
		||||
	logger := st.log.New(ruleKey.LogContext()...)
 | 
			
		||||
	logger.Debug("resetting state of the rule")
 | 
			
		||||
	states := st.cache.removeByRuleUID(ruleKey.OrgID, ruleKey.UID)
 | 
			
		||||
	if len(states) > 0 {
 | 
			
		||||
		err := st.instanceStore.DeleteAlertInstancesByRule(ctx, ruleKey)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			logger.Error("failed to delete states that belong to a rule from database", ruleKey.LogContext()...)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	logger.Info("rules state was reset", "deleted_states", len(states))
 | 
			
		||||
	return states
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.
 | 
			
		||||
// if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels
 | 
			
		||||
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []*State {
 | 
			
		||||
	logger := st.log.New(alertRule.GetKey().LogContext())
 | 
			
		||||
	logger := st.log.New(alertRule.GetKey().LogContext()...)
 | 
			
		||||
	logger.Debug("state manager processing evaluation results", "resultCount", len(results))
 | 
			
		||||
	var states []*State
 | 
			
		||||
	processedResults := make(map[string]*State, len(results))
 | 
			
		||||
 
 | 
			
		||||
@@ -15,6 +15,7 @@ type InstanceStore interface {
 | 
			
		||||
	SaveAlertInstance(ctx context.Context, cmd *models.SaveAlertInstanceCommand) error
 | 
			
		||||
	FetchOrgIds(ctx context.Context) ([]int64, error)
 | 
			
		||||
	DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID, labelsHash string) error
 | 
			
		||||
	DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAlertInstance is a handler for retrieving an alert instance based on OrgId, AlertDefintionID, and
 | 
			
		||||
@@ -158,3 +159,10 @@ func (st DBstore) DeleteAlertInstance(ctx context.Context, orgID int64, ruleUID,
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error {
 | 
			
		||||
	return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
 | 
			
		||||
		_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ?", key.OrgID, key.UID)
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -404,6 +404,9 @@ func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { re
 | 
			
		||||
func (f *FakeInstanceStore) DeleteAlertInstance(_ context.Context, _ int64, _, _ string) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewFakeAdminConfigStore(t *testing.T) *FakeAdminConfigStore {
 | 
			
		||||
	t.Helper()
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user