Alerting: Add SaveAlertInstancesForRule instance store method (#94505)

Alerting: Add SaveAlertInstancesForRule method to the InstanceStore interface
This commit is contained in:
Alexander Akhmetov 2024-10-11 13:47:44 +02:00 committed by GitHub
parent e2672021bc
commit 0a4e6ff86b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 85 additions and 29 deletions

View File

@ -464,6 +464,11 @@ type AlertRuleKeyWithVersion struct {
AlertRuleKey `xorm:"extends"`
}
type AlertRuleKeyWithGroup struct {
RuleGroup string
AlertRuleKey `xorm:"extends"`
}
type AlertRuleKeyWithId struct {
AlertRuleKey
ID int64
@ -517,6 +522,11 @@ func (alertRule *AlertRule) GetKey() AlertRuleKey {
return AlertRuleKey{OrgID: alertRule.OrgID, UID: alertRule.UID}
}
// GetKeyWithGroup returns the alert definitions identifier
func (alertRule *AlertRule) GetKeyWithGroup() AlertRuleKeyWithGroup {
return AlertRuleKeyWithGroup{AlertRuleKey: alertRule.GetKey(), RuleGroup: alertRule.RuleGroup}
}
// GetGroupKey returns the identifier of a group the rule belongs to
func (alertRule *AlertRule) GetGroupKey() AlertRuleGroupKey {
return AlertRuleGroupKey{OrgID: alertRule.OrgID, NamespaceUID: alertRule.NamespaceUID, RuleGroup: alertRule.RuleGroup}

View File

@ -884,3 +884,27 @@ func TestTimeRangeYAML(t *testing.T) {
require.NoError(t, err)
require.Equal(t, yamlRaw, string(serialized))
}
func TestAlertRuleGetKey(t *testing.T) {
t.Run("should return correct key", func(t *testing.T) {
rule := RuleGen.GenerateRef()
expected := AlertRuleKey{
OrgID: rule.OrgID,
UID: rule.UID,
}
require.Equal(t, expected, rule.GetKey())
})
}
func TestAlertRuleGetKeyWithGroup(t *testing.T) {
t.Run("should return correct key", func(t *testing.T) {
rule := RuleGen.With(
RuleMuts.WithUniqueGroupIndex(),
).GenerateRef()
expected := AlertRuleKeyWithGroup{
AlertRuleKey: rule.GetKey(),
RuleGroup: rule.RuleGroup,
}
require.Equal(t, expected, rule.GetKeyWithGroup())
})
}

View File

@ -587,6 +587,14 @@ func GenerateRuleKey(orgID int64) AlertRuleKey {
}
}
// GenerateRuleKeyWithGroup generates a random alert rule key with group
func GenerateRuleKeyWithGroup(orgID int64) AlertRuleKeyWithGroup {
return AlertRuleKeyWithGroup{
AlertRuleKey: GenerateRuleKey(orgID),
RuleGroup: util.GenerateShortUID(),
}
}
// GenerateGroupKey generates a random group key
func GenerateGroupKey(orgID int64) AlertRuleGroupKey {
return AlertRuleGroupKey{

View File

@ -86,7 +86,7 @@ func newRuleFactory(
}
return newAlertRule(
ctx,
rule.GetKey(),
rule.GetKeyWithGroup(),
appURL,
disableGrafanaFolder,
maxAttempts,
@ -112,7 +112,7 @@ type ruleProvider interface {
}
type alertRule struct {
key ngmodels.AlertRuleKey
key ngmodels.AlertRuleKeyWithGroup
evalCh chan *Evaluation
updateCh chan RuleVersionAndPauseStatus
@ -140,7 +140,7 @@ type alertRule struct {
func newAlertRule(
parent context.Context,
key ngmodels.AlertRuleKey,
key ngmodels.AlertRuleKeyWithGroup,
appURL *url.URL,
disableGrafanaFolder bool,
maxAttempts int64,
@ -155,7 +155,7 @@ func newAlertRule(
evalAppliedHook func(ngmodels.AlertRuleKey, time.Time),
stopAppliedHook func(ngmodels.AlertRuleKey),
) *alertRule {
ctx, stop := util.WithCancelCause(ngmodels.WithRuleKey(parent, key))
ctx, stop := util.WithCancelCause(ngmodels.WithRuleKey(parent, key.AlertRuleKey))
return &alertRule{
key: key,
evalCh: make(chan *Evaluation),
@ -194,7 +194,7 @@ func (a *alertRule) Status() ngmodels.RuleStatus {
//
// the second element contains a dropped message that was sent by a concurrent sender.
func (a *alertRule) Eval(eval *Evaluation) (bool, *Evaluation) {
if a.key != eval.rule.GetKey() {
if a.key != eval.rule.GetKeyWithGroup() {
// Make sure that rule has the same key. This should not happen
a.logger.Error("Invalid rule sent for evaluating. Skipping", "ruleKeyToEvaluate", eval.rule.GetKey().String())
return false, eval
@ -352,7 +352,7 @@ func (a *alertRule) Run() error {
// cases.
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
defer cancelFunc()
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key), a.key, ngmodels.StateReasonRuleDeleted)
states := a.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, a.key.AlertRuleKey), a.key, ngmodels.StateReasonRuleDeleted)
a.expireAndSend(grafanaCtx, states)
}
a.logger.Debug("Stopping alert rule routine")
@ -467,7 +467,7 @@ func (a *alertRule) send(ctx context.Context, logger log.Logger, states state.St
if len(alerts.PostableAlerts) > 0 {
logger.Debug("Sending transitions to notifier", "transitions", len(alerts.PostableAlerts))
a.sender.Send(ctx, a.key, alerts)
a.sender.Send(ctx, a.key.AlertRuleKey, alerts)
}
return alerts
}
@ -476,12 +476,12 @@ func (a *alertRule) send(ctx context.Context, logger log.Logger, states state.St
func (a *alertRule) expireAndSend(ctx context.Context, states []state.StateTransition) {
expiredAlerts := state.FromAlertsStateToStoppedAlert(states, a.appURL, a.clock)
if len(expiredAlerts.PostableAlerts) > 0 {
a.sender.Send(ctx, a.key, expiredAlerts)
a.sender.Send(ctx, a.key.AlertRuleKey, expiredAlerts)
}
}
func (a *alertRule) resetState(ctx context.Context, isPaused bool) {
rule := a.ruleProvider.get(a.key)
rule := a.ruleProvider.get(a.key.AlertRuleKey)
reason := ngmodels.StateReasonUpdated
if isPaused {
reason = ngmodels.StateReasonPaused
@ -496,7 +496,7 @@ func (a *alertRule) evalApplied(now time.Time) {
return
}
a.evalAppliedHook(a.key, now)
a.evalAppliedHook(a.key.AlertRuleKey, now)
}
// stopApplied is only used on tests.
@ -505,7 +505,7 @@ func (a *alertRule) stopApplied() {
return
}
a.stopAppliedHook(a.key)
a.stopAppliedHook(a.key.AlertRuleKey)
}
func SchedulerUserFor(orgID int64) *user.SignedInUser {

View File

@ -39,7 +39,7 @@ func TestAlertRule(t *testing.T) {
t.Run("when rule evaluation is not stopped", func(t *testing.T) {
t.Run("update should send to updateCh", func(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
resultCh := make(chan bool)
go func() {
resultCh <- r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false})
@ -52,7 +52,7 @@ func TestAlertRule(t *testing.T) {
}
})
t.Run("update should drop any concurrent sending to updateCh", func(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
version1 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
version2 := RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}
@ -79,7 +79,7 @@ func TestAlertRule(t *testing.T) {
})
t.Run("eval should send to evalCh", func(t *testing.T) {
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
r := blankRuleForTests(context.Background(), ruleSpec.GetKeyWithGroup())
expected := time.Now()
resultCh := make(chan evalResponse)
data := &Evaluation{
@ -103,7 +103,7 @@ func TestAlertRule(t *testing.T) {
})
t.Run("eval should drop any concurrent sending to evalCh", func(t *testing.T) {
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
r := blankRuleForTests(context.Background(), ruleSpec.GetKeyWithGroup())
time1 := time.UnixMilli(rand.Int63n(math.MaxInt64))
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
resultCh1 := make(chan evalResponse)
@ -150,7 +150,7 @@ func TestAlertRule(t *testing.T) {
})
t.Run("eval should exit when context is cancelled", func(t *testing.T) {
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
r := blankRuleForTests(context.Background(), ruleSpec.GetKeyWithGroup())
resultCh := make(chan evalResponse)
data := &Evaluation{
scheduledAt: time.Now(),
@ -174,14 +174,14 @@ func TestAlertRule(t *testing.T) {
})
t.Run("when rule evaluation is stopped", func(t *testing.T) {
t.Run("Update should do nothing", func(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
r.Stop(errRuleDeleted)
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
require.False(t, r.Update(RuleVersionAndPauseStatus{fingerprint(rand.Uint64()), false}))
})
t.Run("eval should do nothing", func(t *testing.T) {
ruleSpec := gen.GenerateRef()
r := blankRuleForTests(context.Background(), ruleSpec.GetKey())
r := blankRuleForTests(context.Background(), ruleSpec.GetKeyWithGroup())
r.Stop(nil)
data := &Evaluation{
scheduledAt: time.Now(),
@ -193,19 +193,19 @@ func TestAlertRule(t *testing.T) {
require.Nilf(t, dropped, "expected no dropped evaluations but got one")
})
t.Run("calling stop multiple times should not panic", func(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
r.Stop(nil)
r.Stop(nil)
})
t.Run("stop should not panic if parent context stopped", func(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
r := blankRuleForTests(ctx, models.GenerateRuleKey(1))
r := blankRuleForTests(ctx, models.GenerateRuleKeyWithGroup(1))
cancelFn()
r.Stop(nil)
})
})
t.Run("should be thread-safe", func(t *testing.T) {
r := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
r := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
wg := sync.WaitGroup{}
go func() {
for {
@ -249,7 +249,7 @@ func TestAlertRule(t *testing.T) {
})
t.Run("Run should exit if idle when Stop is called", func(t *testing.T) {
rule := blankRuleForTests(context.Background(), models.GenerateRuleKey(1))
rule := blankRuleForTests(context.Background(), models.GenerateRuleKeyWithGroup(1))
runResult := make(chan error)
go func() {
runResult <- rule.Run()
@ -266,7 +266,7 @@ func TestAlertRule(t *testing.T) {
})
}
func blankRuleForTests(ctx context.Context, key models.AlertRuleKey) *alertRule {
func blankRuleForTests(ctx context.Context, key models.AlertRuleKeyWithGroup) *alertRule {
return newAlertRule(ctx, key, nil, false, 0, nil, nil, nil, nil, nil, nil, log.NewNopLogger(), nil, nil, nil)
}

View File

@ -240,7 +240,7 @@ func (st *Manager) Get(orgID int64, alertRuleUID string, stateId data.Fingerprin
// DeleteStateByRuleUID removes the rule instances from cache and instanceStore. A closed channel is returned to be able
// to gracefully handle the clear state step in scheduler in case we do not need to use the historian to save state
// history.
func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey, reason string) []StateTransition {
func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKeyWithGroup, reason string) []StateTransition {
logger := st.log.FromContext(ctx)
logger.Debug("Resetting state of the rule")
@ -290,7 +290,7 @@ func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.Al
// ResetStateByRuleUID removes the rule instances from cache and instanceStore and saves state history. If the state
// history has to be saved, rule must not be nil.
func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.AlertRule, reason string) []StateTransition {
ruleKey := rule.GetKey()
ruleKey := rule.GetKeyWithGroup()
transitions := st.DeleteStateByRuleUID(ctx, ruleKey, reason)
if rule == nil || st.historian == nil || len(transitions) == 0 {

View File

@ -2083,7 +2083,7 @@ func TestDeleteStateByRuleUID(t *testing.T) {
assert.Equal(t, tc.startingInstanceDBCount, len(alerts))
expectedReason := util.GenerateShortUID()
transitions := st.DeleteStateByRuleUID(ctx, rule.GetKey(), expectedReason)
transitions := st.DeleteStateByRuleUID(ctx, rule.GetKeyWithGroup(), expectedReason)
// Check that the deleted states are the same as the ones that were in cache
assert.Equal(t, tc.startingStateCacheCount, len(transitions))

View File

@ -13,7 +13,9 @@ type InstanceStore interface {
ListAlertInstances(ctx context.Context, cmd *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error)
SaveAlertInstance(ctx context.Context, instance models.AlertInstance) error
DeleteAlertInstances(ctx context.Context, keys ...models.AlertInstanceKey) error
DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error
// SaveAlertInstancesForRule overwrites the state for the given rule.
SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error
DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error
FullSync(ctx context.Context, instances []models.AlertInstance) error
}

View File

@ -56,7 +56,11 @@ func (f *FakeInstanceStore) DeleteAlertInstances(ctx context.Context, q ...model
return nil
}
func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error {
func (f *FakeInstanceStore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error {
return nil
}
func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error {
return nil
}

View File

@ -2,6 +2,7 @@ package store
import (
"context"
"errors"
"fmt"
"sort"
"strings"
@ -32,6 +33,7 @@ func (st DBstore) ListAlertInstances(ctx context.Context, cmd *models.ListAlertI
if cmd.RuleUID != "" {
addToQuery(` AND rule_uid = ?`, cmd.RuleUID)
}
if st.FeatureToggles.IsEnabled(ctx, featuremgmt.FlagAlertingNoNormalState) {
s.WriteString(fmt.Sprintf(" AND NOT (current_state = '%s' AND current_reason = '')", models.InstanceStateNormal))
}
@ -206,7 +208,13 @@ func (st DBstore) DeleteAlertInstances(ctx context.Context, keys ...models.Alert
return err
}
func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKey) error {
// SaveAlertInstancesForRule is not implemented for instance database store.
func (st DBstore) SaveAlertInstancesForRule(ctx context.Context, key models.AlertRuleKeyWithGroup, instances []models.AlertInstance) error {
st.Logger.Error("SaveAlertInstancesForRule is not implemented for instance database store.")
return errors.New("method SaveAlertInstancesForRule is not implemented for instance database store")
}
func (st DBstore) DeleteAlertInstancesByRule(ctx context.Context, key models.AlertRuleKeyWithGroup) error {
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
_, err := sess.Exec("DELETE FROM alert_instance WHERE rule_org_id = ? AND rule_uid = ?", key.OrgID, key.UID)
return err