mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Add alert pausing feature (#60734)
* Add field in alert_rule model, add state to alert_instance model, and state to eval * Remove paused state from eval package * Skip paused alert rules in scheduler * Add migration to add is_paused field to alert_rule table * Convert to postable alerts only if not normal, pernding, or paused * Handle paused eval results in state manager * Add Paused state to eval package * Add paused alerts logic in scheduler * Skip alert on scheduler * Remove paused status from eval package * Apply suggestions from code review Co-authored-by: George Robinson <george.robinson@grafana.com> * Remove state * Rethink schedule and manager for paused alerts * Change return to continue * Remove unused var * Rethink alert pausing * Paused alerts storing annotations * Only add one state transition * Revert boolean method renaming refactor * Revert take image refactor * Make registry errors public * Revert method extraction for getting a folder title * Revert variable renaming refactor * Undo unnecessary changes * Revert changes in test * Remove IsPause check in PatchPartiLAlertRule function * Use SetNormal to set state * Fix text by returning to old behaviour on alert rule deletion * Add test in schedule_unit_test.go to test ticks with paused alerts * Add coment to clarify usage of context.Background() * Add comment to clarify resetStateByRuleUID method usage * Move rule get to a more limited scope * Update pkg/services/ngalert/schedule/schedule.go Co-authored-by: George Robinson <george.robinson@grafana.com> * rum gofmt on pkg/services/ngalert/schedule/schedule.go * Remove defer cancel for context * Update pkg/services/ngalert/models/instance_test.go Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> * Update pkg/services/ngalert/models/testing.go Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> * Update pkg/services/ngalert/schedule/schedule_unit_test.go Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> * Update pkg/services/ngalert/schedule/schedule_unit_test.go Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> * Update pkg/services/ngalert/models/instance_test.go Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> * skip scheduler rule state clean up on paused alert rule * Update pkg/services/ngalert/schedule/schedule.go Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> * Fix mock in test * Add (hopefully) final suggestions * Use error channel from recordAnnotationsSync to cancel context * Run make gen-cue * Place pause alert check in channel update after version check * Reduce branching un update channel select * Add if for error and move code inside if in state manager ResetStateByRuleUID * Add reason to logs * Update pkg/services/ngalert/schedule/schedule.go Co-authored-by: George Robinson <george.robinson@grafana.com> * Do not delete alert rule routine, just exit on eval if is paused * Reduce branching and create-close a channel to avoid deadlocks * Separate state deletion and state reset (includes history saving) * Add current pause state in rule route in scheduler * Split clearState and bring errCh closer to RecordStatesAsync call * Change rule to ruleMeta in RecordStatesAsync * copy state to be able to modify it * Add timeout to context creation * Shorten the timeout * Use resetState is rule is paused and deleteState if rule is not paused * Remove Empty state reason * Save every rule change in historian * Add tests for DeleteStateByRuleUID and ResetStateByRuleUID * Remove useless line * Remove outdated comment Co-authored-by: George Robinson <george.robinson@grafana.com> Co-authored-by: Santiago <santiagohernandez.1997@gmail.com> Co-authored-by: Armand Grillet <2117580+armandgrillet@users.noreply.github.com>
This commit is contained in:
parent
039b30b1ea
commit
531b439cf1
@ -426,7 +426,7 @@ func (srv RulerSrv) updateAlertRulesInGroup(c *models.ReqContext, groupKey ngmod
|
||||
srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{
|
||||
OrgID: c.SignedInUser.OrgID,
|
||||
UID: rule.Existing.UID,
|
||||
}, rule.Existing.Version+1)
|
||||
}, rule.Existing.Version+1, rule.New.IsPaused)
|
||||
}
|
||||
|
||||
if len(finalChanges.Delete) > 0 {
|
||||
|
@ -22,7 +22,7 @@ type RuleStore interface {
|
||||
DeleteAlertRulesByUID(ctx context.Context, orgID int64, ruleUID ...string) error
|
||||
|
||||
// IncreaseVersionForAllRulesInNamespace Increases version for all rules that have specified namespace. Returns all rules that belong to the namespace
|
||||
IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersion, error)
|
||||
IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersionAndPauseStatus, error)
|
||||
|
||||
Count(ctx context.Context, orgID int64) (int64, error)
|
||||
}
|
||||
|
@ -109,6 +109,8 @@ const (
|
||||
const (
|
||||
StateReasonMissingSeries = "MissingSeries"
|
||||
StateReasonError = "Error"
|
||||
StateReasonPaused = "Paused"
|
||||
StateReasonUpdated = "Updated"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -156,6 +158,7 @@ type AlertRule struct {
|
||||
For time.Duration
|
||||
Annotations map[string]string
|
||||
Labels map[string]string
|
||||
IsPaused bool
|
||||
}
|
||||
|
||||
// GetDashboardUID returns the DashboardUID or "".
|
||||
@ -261,6 +264,11 @@ type AlertRuleKeyWithVersion struct {
|
||||
AlertRuleKey `xorm:"extends"`
|
||||
}
|
||||
|
||||
type AlertRuleKeyWithVersionAndPauseStatus struct {
|
||||
IsPaused bool
|
||||
AlertRuleKeyWithVersion `xorm:"extends"`
|
||||
}
|
||||
|
||||
// AlertRuleGroupKey is the identifier of a group of alerts
|
||||
type AlertRuleGroupKey struct {
|
||||
OrgID int64
|
||||
@ -335,6 +343,7 @@ type AlertRuleVersion struct {
|
||||
For time.Duration
|
||||
Annotations map[string]string
|
||||
Labels map[string]string
|
||||
IsPaused bool
|
||||
}
|
||||
|
||||
// GetAlertRuleByUIDQuery is the query for retrieving/deleting an alert rule by UID and organisation ID.
|
||||
|
@ -34,7 +34,7 @@ const (
|
||||
InstanceStatePending InstanceStateType = "Pending"
|
||||
// InstanceStateNoData is for an alert with no data.
|
||||
InstanceStateNoData InstanceStateType = "NoData"
|
||||
// InstanceStateError is for a erroring alert.
|
||||
// InstanceStateError is for an erroring alert.
|
||||
InstanceStateError InstanceStateType = "Error"
|
||||
)
|
||||
|
||||
|
105
pkg/services/ngalert/models/instance_test.go
Normal file
105
pkg/services/ngalert/models/instance_test.go
Normal file
@ -0,0 +1,105 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestInstanceStateType_IsValid(t *testing.T) {
|
||||
testCases := []struct {
|
||||
instanceType InstanceStateType
|
||||
expectedValidity bool
|
||||
}{
|
||||
{
|
||||
instanceType: InstanceStateFiring,
|
||||
expectedValidity: true,
|
||||
},
|
||||
{
|
||||
instanceType: InstanceStateNormal,
|
||||
expectedValidity: true,
|
||||
},
|
||||
{
|
||||
instanceType: InstanceStatePending,
|
||||
expectedValidity: true,
|
||||
},
|
||||
{
|
||||
instanceType: InstanceStateNoData,
|
||||
expectedValidity: true,
|
||||
},
|
||||
{
|
||||
instanceType: InstanceStateError,
|
||||
expectedValidity: true,
|
||||
},
|
||||
{
|
||||
instanceType: InstanceStateType("notAValidInstanceStateType"),
|
||||
expectedValidity: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(buildTestInstanceStateTypeIsValidName(tc.instanceType, tc.expectedValidity), func(t *testing.T) {
|
||||
require.Equal(t, tc.expectedValidity, tc.instanceType.IsValid())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func buildTestInstanceStateTypeIsValidName(instanceType InstanceStateType, expectedValidity bool) string {
|
||||
if expectedValidity {
|
||||
return fmt.Sprintf("%q should be valid", instanceType)
|
||||
}
|
||||
return fmt.Sprintf("%q should not be valid", instanceType)
|
||||
}
|
||||
|
||||
func TestValidateAlertInstance(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
orgId int64
|
||||
uid string
|
||||
currentState InstanceStateType
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "fails if orgID is empty",
|
||||
orgId: 0,
|
||||
uid: "validUid",
|
||||
currentState: InstanceStateNormal,
|
||||
err: errors.New("alert instance is invalid due to missing alert rule organisation"),
|
||||
},
|
||||
{
|
||||
name: "fails if uid is empty",
|
||||
orgId: 1,
|
||||
uid: "",
|
||||
currentState: InstanceStateNormal,
|
||||
err: errors.New("alert instance is invalid due to missing alert rule uid"),
|
||||
},
|
||||
{
|
||||
name: "fails if current state is not valid",
|
||||
orgId: 1,
|
||||
uid: "validUid",
|
||||
currentState: InstanceStateType("notAValidType"),
|
||||
err: errors.New("alert instance is invalid because the state 'notAValidType' is invalid"),
|
||||
},
|
||||
{
|
||||
name: "ok if validated fields are correct",
|
||||
orgId: 1,
|
||||
uid: "validUid",
|
||||
currentState: InstanceStateNormal,
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
instance := AlertInstanceGen(func(instance *AlertInstance) {
|
||||
instance.AlertInstanceKey.RuleOrgID = tc.orgId
|
||||
instance.AlertInstanceKey.RuleUID = tc.uid
|
||||
instance.CurrentState = tc.currentState
|
||||
})
|
||||
|
||||
require.Equal(t, tc.err, ValidateAlertInstance(*instance))
|
||||
})
|
||||
}
|
||||
}
|
@ -337,3 +337,46 @@ func CreateClassicConditionExpression(refID string, inputRefID string, reducer s
|
||||
}`, refID, inputRefID, operation, threshold, reducer, expr.OldDatasourceUID, expr.DatasourceType)),
|
||||
}
|
||||
}
|
||||
|
||||
type AlertInstanceMutator func(*AlertInstance)
|
||||
|
||||
// AlertInstanceGen provides a factory function that generates a random AlertInstance.
|
||||
// The mutators arguments allows changing fields of the resulting structure.
|
||||
func AlertInstanceGen(mutators ...AlertInstanceMutator) *AlertInstance {
|
||||
var labels map[string]string = nil
|
||||
if rand.Int63()%2 == 0 {
|
||||
labels = GenerateAlertLabels(rand.Intn(5), "lbl-")
|
||||
}
|
||||
|
||||
randState := func() InstanceStateType {
|
||||
s := [...]InstanceStateType{
|
||||
InstanceStateFiring,
|
||||
InstanceStateNormal,
|
||||
InstanceStatePending,
|
||||
InstanceStateNoData,
|
||||
InstanceStateError,
|
||||
}
|
||||
return s[rand.Intn(len(s))]
|
||||
}
|
||||
|
||||
currentStateSince := time.Now().Add(-time.Duration(rand.Intn(100) + 1))
|
||||
|
||||
instance := &AlertInstance{
|
||||
AlertInstanceKey: AlertInstanceKey{
|
||||
RuleOrgID: rand.Int63n(1500),
|
||||
RuleUID: util.GenerateShortUID(),
|
||||
LabelsHash: util.GenerateShortUID(),
|
||||
},
|
||||
Labels: labels,
|
||||
CurrentState: randState(),
|
||||
CurrentReason: "TEST-REASON-" + util.GenerateShortUID(),
|
||||
CurrentStateSince: currentStateSince,
|
||||
CurrentStateEnd: currentStateSince.Add(time.Duration(rand.Intn(100) + 200)),
|
||||
LastEvalTime: time.Now().Add(-time.Duration(rand.Intn(100) + 50)),
|
||||
}
|
||||
|
||||
for _, mutator := range mutators {
|
||||
mutator(instance)
|
||||
}
|
||||
return instance
|
||||
}
|
||||
|
@ -310,7 +310,7 @@ func subscribeToFolderChanges(ctx context.Context, logger log.Logger, bus bus.Bu
|
||||
if len(updated) > 0 {
|
||||
logger.Info("Rules that belong to the folder have been updated successfully. Clearing their status", "folderUID", evt.UID, "updatedRules", len(updated))
|
||||
for _, key := range updated {
|
||||
scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version)
|
||||
scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version, key.IsPaused)
|
||||
}
|
||||
} else {
|
||||
logger.Debug("No alert rules found in the folder. nothing to update", "folderUID", evt.UID, "folder", evt.Title)
|
||||
|
@ -35,7 +35,7 @@ func Test_subscribeToFolderChanges(t *testing.T) {
|
||||
db.PutRule(context.Background(), rules...)
|
||||
|
||||
scheduler := &schedule.FakeScheduleService{}
|
||||
scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything).Return()
|
||||
scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
|
||||
subscribeToFolderChanges(context.Background(), log.New("test"), bus, db, scheduler)
|
||||
|
||||
@ -69,6 +69,6 @@ func Test_subscribeToFolderChanges(t *testing.T) {
|
||||
}, time.Second, 10*time.Millisecond, "scheduler was expected to be called %d times but called %d", len(rules), calledTimes)
|
||||
|
||||
for _, rule := range rules {
|
||||
scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version)
|
||||
scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version, false)
|
||||
}
|
||||
}
|
||||
|
@ -77,17 +77,21 @@ func (r *alertRuleInfoRegistry) keyMap() map[models.AlertRuleKey]struct{} {
|
||||
}
|
||||
|
||||
type ruleVersion int64
|
||||
type ruleVersionAndPauseStatus struct {
|
||||
Version ruleVersion
|
||||
IsPaused bool
|
||||
}
|
||||
|
||||
type alertRuleInfo struct {
|
||||
evalCh chan *evaluation
|
||||
updateCh chan ruleVersion
|
||||
updateCh chan ruleVersionAndPauseStatus
|
||||
ctx context.Context
|
||||
stop func(reason error)
|
||||
}
|
||||
|
||||
func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
|
||||
ctx, stop := util.WithCancelCause(parent)
|
||||
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersion), ctx: ctx, stop: stop}
|
||||
return &alertRuleInfo{evalCh: make(chan *evaluation), updateCh: make(chan ruleVersionAndPauseStatus), ctx: ctx, stop: stop}
|
||||
}
|
||||
|
||||
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped.
|
||||
@ -114,13 +118,13 @@ func (a *alertRuleInfo) eval(eval *evaluation) (bool, *evaluation) {
|
||||
}
|
||||
|
||||
// update sends an instruction to the rule evaluation routine to update the scheduled rule to the specified version. The specified version must be later than the current version, otherwise no update will happen.
|
||||
func (a *alertRuleInfo) update(lastVersion ruleVersion) bool {
|
||||
func (a *alertRuleInfo) update(lastVersion ruleVersionAndPauseStatus) bool {
|
||||
// check if the channel is not empty.
|
||||
msg := lastVersion
|
||||
select {
|
||||
case v := <-a.updateCh:
|
||||
// if it has a version pick the greatest one.
|
||||
if v > msg {
|
||||
if v.Version > msg.Version {
|
||||
msg = v
|
||||
}
|
||||
case <-a.ctx.Done():
|
||||
|
@ -27,7 +27,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
resultCh := make(chan bool)
|
||||
go func() {
|
||||
resultCh <- r.update(ruleVersion(rand.Int63()))
|
||||
resultCh <- r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false})
|
||||
}()
|
||||
select {
|
||||
case <-r.updateCh:
|
||||
@ -45,19 +45,19 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
wg.Done()
|
||||
r.update(version1)
|
||||
r.update(ruleVersionAndPauseStatus{version1, false})
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
|
||||
go func() {
|
||||
wg.Done()
|
||||
r.update(version2)
|
||||
r.update(ruleVersionAndPauseStatus{version2, false})
|
||||
}()
|
||||
wg.Wait() // at this point tick 1 has already been dropped
|
||||
select {
|
||||
case version := <-r.updateCh:
|
||||
require.Equal(t, version2, version)
|
||||
require.Equal(t, ruleVersionAndPauseStatus{version2, false}, version)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
}
|
||||
@ -71,19 +71,19 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
wg.Done()
|
||||
r.update(version2)
|
||||
r.update(ruleVersionAndPauseStatus{version2, false})
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
|
||||
go func() {
|
||||
wg.Done()
|
||||
r.update(version1)
|
||||
r.update(ruleVersionAndPauseStatus{version1, false})
|
||||
}()
|
||||
wg.Wait() // at this point tick 1 has already been dropped
|
||||
select {
|
||||
case version := <-r.updateCh:
|
||||
require.Equal(t, version2, version)
|
||||
require.Equal(t, ruleVersionAndPauseStatus{version2, false}, version)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on eval channel")
|
||||
}
|
||||
@ -185,7 +185,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
r.stop(errRuleDeleted)
|
||||
require.ErrorIs(t, r.ctx.Err(), errRuleDeleted)
|
||||
require.False(t, r.update(ruleVersion(rand.Int63())))
|
||||
require.False(t, r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false}))
|
||||
})
|
||||
t.Run("eval should do nothing", func(t *testing.T) {
|
||||
r := newAlertRuleInfo(context.Background())
|
||||
@ -237,7 +237,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
|
||||
}
|
||||
switch rand.Intn(max) + 1 {
|
||||
case 1:
|
||||
r.update(ruleVersion(rand.Int63()))
|
||||
r.update(ruleVersionAndPauseStatus{ruleVersion(rand.Int63()), false})
|
||||
case 2:
|
||||
r.eval(&evaluation{
|
||||
scheduledAt: time.Now(),
|
||||
|
@ -37,7 +37,7 @@ type ScheduleService interface {
|
||||
// an error. The scheduler is terminated when this function returns.
|
||||
Run(context.Context) error
|
||||
// UpdateAlertRule notifies scheduler that a rule has been changed
|
||||
UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64)
|
||||
UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool)
|
||||
// DeleteAlertRule notifies scheduler that rules have been deleted
|
||||
DeleteAlertRule(keys ...ngmodels.AlertRuleKey)
|
||||
}
|
||||
@ -150,12 +150,12 @@ func (sch *schedule) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule
|
||||
func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64) {
|
||||
func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool) {
|
||||
ruleInfo, err := sch.registry.get(key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ruleInfo.update(ruleVersion(lastVersion))
|
||||
ruleInfo.update(ruleVersionAndPauseStatus{ruleVersion(lastVersion), isPaused})
|
||||
}
|
||||
|
||||
// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
|
||||
@ -314,7 +314,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
||||
return readyToRun, registeredDefinitions
|
||||
}
|
||||
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersion) error {
|
||||
func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error {
|
||||
grafanaCtx = ngmodels.WithRuleKey(grafanaCtx, key)
|
||||
logger := sch.log.FromContext(grafanaCtx)
|
||||
logger.Debug("Alert rule routine started")
|
||||
@ -324,14 +324,23 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
evalDuration := sch.metrics.EvalDuration.WithLabelValues(orgID)
|
||||
evalTotalFailures := sch.metrics.EvalFailures.WithLabelValues(orgID)
|
||||
|
||||
clearState := func() {
|
||||
states := sch.stateManager.ResetStateByRuleUID(grafanaCtx, key)
|
||||
notify := func(states []*state.State) {
|
||||
expiredAlerts := FromAlertsStateToStoppedAlert(states, sch.appURL, sch.clock)
|
||||
if len(expiredAlerts.PostableAlerts) > 0 {
|
||||
sch.alertsSender.Send(key, expiredAlerts)
|
||||
}
|
||||
}
|
||||
|
||||
resetState := func(ctx context.Context, isPaused bool) {
|
||||
rule := sch.schedulableAlertRules.get(key)
|
||||
reason := ngmodels.StateReasonUpdated
|
||||
if isPaused {
|
||||
reason = ngmodels.StateReasonPaused
|
||||
}
|
||||
states := sch.stateManager.ResetStateByRuleUID(ctx, rule, reason)
|
||||
notify(states)
|
||||
}
|
||||
|
||||
evaluate := func(ctx context.Context, attempt int64, e *evaluation, span tracing.Span) {
|
||||
logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt)
|
||||
start := sch.clock.Now()
|
||||
@ -431,18 +440,19 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
for {
|
||||
select {
|
||||
// used by external services (API) to notify that rule is updated.
|
||||
case lastVersion := <-updateCh:
|
||||
case ctx := <-updateCh:
|
||||
// sometimes it can happen when, for example, the rule evaluation took so long,
|
||||
// and there were two concurrent messages in updateCh and evalCh, and the eval's one got processed first.
|
||||
// therefore, at the time when message from updateCh is processed the current rule will have
|
||||
// at least the same version (or greater) and the state created for the new version of the rule.
|
||||
if currentRuleVersion >= int64(lastVersion) {
|
||||
logger.Info("Skip updating rule because its current version is actual", "version", currentRuleVersion, "newVersion", lastVersion)
|
||||
if currentRuleVersion >= int64(ctx.Version) {
|
||||
logger.Info("Skip updating rule because its current version is actual", "version", currentRuleVersion, "newVersion", ctx.Version)
|
||||
continue
|
||||
}
|
||||
logger.Info("Clearing the state of the rule because version has changed", "version", currentRuleVersion, "newVersion", lastVersion)
|
||||
|
||||
logger.Info("Clearing the state of the rule because it was updated", "version", currentRuleVersion, "newVersion", ctx.Version, "isPaused", ctx.IsPaused)
|
||||
// clear the state. So the next evaluation will start from the scratch.
|
||||
clearState()
|
||||
resetState(grafanaCtx, ctx.IsPaused)
|
||||
// evalCh - used by the scheduler to signal that evaluation is needed.
|
||||
case ctx, ok := <-evalCh:
|
||||
if !ok {
|
||||
@ -462,14 +472,18 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
|
||||
err := retryIfError(func(attempt int64) error {
|
||||
newVersion := ctx.rule.Version
|
||||
isPaused := ctx.rule.IsPaused
|
||||
// fetch latest alert rule version
|
||||
if currentRuleVersion != newVersion {
|
||||
if currentRuleVersion > 0 { // do not clean up state if the eval loop has just started.
|
||||
logger.Debug("Got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "newVersion", newVersion)
|
||||
clearState()
|
||||
resetState(grafanaCtx, isPaused)
|
||||
}
|
||||
currentRuleVersion = newVersion
|
||||
}
|
||||
if isPaused {
|
||||
return nil
|
||||
}
|
||||
tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution")
|
||||
defer span.End()
|
||||
|
||||
@ -489,7 +503,13 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
|
||||
case <-grafanaCtx.Done():
|
||||
// clean up the state only if the reason for stopping the evaluation loop is that the rule was deleted
|
||||
if errors.Is(grafanaCtx.Err(), errRuleDeleted) {
|
||||
clearState()
|
||||
// We do not want a context to be unbounded which could potentially cause a go routine running
|
||||
// indefinitely. 1 minute is an almost randomly chosen timeout, big enough to cover the majority of the
|
||||
// cases.
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancelFunc()
|
||||
states := sch.stateManager.DeleteStateByRuleUID(ngmodels.WithRuleKey(ctx, key), key)
|
||||
notify(states)
|
||||
}
|
||||
logger.Debug("Stopping alert rule routine")
|
||||
return nil
|
||||
|
@ -36,8 +36,8 @@ func (_m *FakeScheduleService) Run(_a0 context.Context) error {
|
||||
}
|
||||
|
||||
// UpdateAlertRule provides a mock function with given fields: key, lastVersion
|
||||
func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64) {
|
||||
_m.Called(key, lastVersion)
|
||||
func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64, isPaused bool) {
|
||||
_m.Called(key, lastVersion, isPaused)
|
||||
}
|
||||
|
||||
// evalApplied provides a mock function with given fields: _a0, _a1
|
||||
|
@ -235,7 +235,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
|
||||
}()
|
||||
|
||||
expectedTime := time.UnixMicro(rand.Int63())
|
||||
@ -347,7 +347,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersion))
|
||||
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evaluation), make(chan ruleVersionAndPauseStatus))
|
||||
stoppedChan <- err
|
||||
}()
|
||||
|
||||
@ -366,7 +366,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
ctx, cancel := util.WithCancelCause(context.Background())
|
||||
go func() {
|
||||
err := sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), make(chan ruleVersion))
|
||||
err := sch.ruleRoutine(ctx, rule.GetKey(), make(chan *evaluation), make(chan ruleVersionAndPauseStatus))
|
||||
stoppedChan <- err
|
||||
}()
|
||||
|
||||
@ -383,13 +383,14 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
updateChan := make(chan ruleVersion)
|
||||
updateChan := make(chan ruleVersionAndPauseStatus)
|
||||
|
||||
sender := AlertsSenderMock{}
|
||||
sender.EXPECT().Send(rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[string]string{rule.NamespaceUID: "folderName"})
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -433,9 +434,9 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
require.Greaterf(t, expectedToBeSent, 0, "State manager was expected to return at least one state that can be expired")
|
||||
|
||||
t.Run("should do nothing if version in channel is the same", func(t *testing.T) {
|
||||
updateChan <- ruleVersion(rule.Version - 1)
|
||||
updateChan <- ruleVersion(rule.Version)
|
||||
updateChan <- ruleVersion(rule.Version) // second time just to make sure that previous messages were handled
|
||||
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version - 1), false}
|
||||
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version), false}
|
||||
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version), false} // second time just to make sure that previous messages were handled
|
||||
|
||||
actualStates := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
require.Len(t, actualStates, len(states))
|
||||
@ -444,7 +445,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should clear the state and expire firing alerts if version in channel is greater", func(t *testing.T) {
|
||||
updateChan <- ruleVersion(rule.Version + rand.Int63n(1000) + 1)
|
||||
updateChan <- ruleVersionAndPauseStatus{ruleVersion(rule.Version + rand.Int63n(1000) + 1), false}
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(sender.Calls) > 0
|
||||
@ -474,7 +475,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
|
||||
}()
|
||||
|
||||
evalChan <- &evaluation{
|
||||
@ -544,7 +545,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
|
||||
}()
|
||||
|
||||
evalChan <- &evaluation{
|
||||
@ -577,7 +578,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersion))
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
|
||||
}()
|
||||
|
||||
evalChan <- &evaluation{
|
||||
@ -601,12 +602,12 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
|
||||
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
|
||||
version := rand.Int63()
|
||||
go func() {
|
||||
sch.UpdateAlertRule(key, version)
|
||||
sch.UpdateAlertRule(key, version, false)
|
||||
}()
|
||||
|
||||
select {
|
||||
case v := <-info.updateCh:
|
||||
require.Equal(t, ruleVersion(version), v)
|
||||
require.Equal(t, ruleVersionAndPauseStatus{ruleVersion(version), false}, v)
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("No message was received on update channel")
|
||||
}
|
||||
@ -616,14 +617,14 @@ func TestSchedule_UpdateAlertRule(t *testing.T) {
|
||||
key := models.GenerateRuleKey(rand.Int63())
|
||||
info, _ := sch.registry.getOrCreateInfo(context.Background(), key)
|
||||
info.stop(nil)
|
||||
sch.UpdateAlertRule(key, rand.Int63())
|
||||
sch.UpdateAlertRule(key, rand.Int63(), false)
|
||||
})
|
||||
})
|
||||
t.Run("when rule does not exist", func(t *testing.T) {
|
||||
t.Run("should exit", func(t *testing.T) {
|
||||
sch := setupScheduler(t, nil, nil, nil, nil, nil)
|
||||
key := models.GenerateRuleKey(rand.Int63())
|
||||
sch.UpdateAlertRule(key, rand.Int63())
|
||||
sch.UpdateAlertRule(key, rand.Int63(), false)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
@ -165,18 +164,61 @@ func (st *Manager) Get(orgID int64, alertRuleUID, stateId string) *State {
|
||||
return st.cache.get(orgID, alertRuleUID, stateId)
|
||||
}
|
||||
|
||||
// ResetStateByRuleUID deletes all entries in the state manager that match the given rule UID.
|
||||
func (st *Manager) ResetStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State {
|
||||
// DeleteStateByRuleUID removes the rule instances from cache and instanceStore. A closed channel is returned to be able
|
||||
// to gracefully handle the clear state step in scheduler in case we do not need to use the historian to save state
|
||||
// history.
|
||||
func (st *Manager) DeleteStateByRuleUID(ctx context.Context, ruleKey ngModels.AlertRuleKey) []*State {
|
||||
logger := st.log.New(ruleKey.LogContext()...)
|
||||
logger.Debug("Resetting state of the rule")
|
||||
|
||||
states := st.cache.removeByRuleUID(ruleKey.OrgID, ruleKey.UID)
|
||||
if len(states) > 0 && st.instanceStore != nil {
|
||||
if len(states) == 0 {
|
||||
return states
|
||||
}
|
||||
if st.instanceStore != nil {
|
||||
err := st.instanceStore.DeleteAlertInstancesByRule(ctx, ruleKey)
|
||||
if err != nil {
|
||||
logger.Error("Failed to delete states that belong to a rule from database", "error", err)
|
||||
}
|
||||
}
|
||||
logger.Info("Rules state was reset", "states", len(states))
|
||||
|
||||
return states
|
||||
}
|
||||
|
||||
// ResetStateByRuleUID removes the rule instances from cache and instanceStore and saves state history. If the state
|
||||
// history has to be saved, rule must not be nil.
|
||||
func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.AlertRule, reason string) []*State {
|
||||
ruleKey := rule.GetKey()
|
||||
states := st.DeleteStateByRuleUID(ctx, ruleKey)
|
||||
|
||||
if rule == nil || st.historian == nil {
|
||||
return states
|
||||
}
|
||||
transitions := make([]StateTransition, 0, len(states))
|
||||
for _, s := range states {
|
||||
oldState := s.State
|
||||
oldReason := s.StateReason
|
||||
state := *s
|
||||
now := time.Now()
|
||||
state.SetNormal(reason, s.StartsAt, now)
|
||||
state.LastEvaluationTime = now
|
||||
state.Values = map[string]float64{}
|
||||
transitions = append(transitions, StateTransition{
|
||||
State: &state,
|
||||
PreviousState: oldState,
|
||||
PreviousStateReason: oldReason,
|
||||
})
|
||||
}
|
||||
|
||||
ruleMeta := history_model.NewRuleMeta(rule, st.log)
|
||||
errCh := st.historian.RecordStatesAsync(ctx, ruleMeta, transitions)
|
||||
go func() {
|
||||
err := <-errCh
|
||||
if err != nil {
|
||||
st.log.FromContext(ctx).Error("Error updating historian state reset transitions", append(ruleKey.LogContext(), "reason", reason, "error", err)...)
|
||||
}
|
||||
}()
|
||||
return states
|
||||
}
|
||||
|
||||
@ -240,7 +282,7 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu
|
||||
logger.Debug("Ignoring set next state as result is pending")
|
||||
}
|
||||
|
||||
// Set reason iff: result is different than state, reason is not Alerting or Normal
|
||||
// Set reason iff: result and state are different, reason is not Alerting or Normal
|
||||
currentState.StateReason = ""
|
||||
|
||||
if currentState.State != result.State &&
|
||||
|
@ -2543,3 +2543,245 @@ func TestStaleResults(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteStateByRuleUID(t *testing.T) {
|
||||
interval := time.Minute
|
||||
ctx := context.Background()
|
||||
_, dbstore := tests.SetupTestEnv(t, 1)
|
||||
|
||||
const mainOrgID int64 = 1
|
||||
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
|
||||
|
||||
labels1 := models.InstanceLabels{"test1": "testValue1"}
|
||||
_, hash1, _ := labels1.StringAndHash()
|
||||
labels2 := models.InstanceLabels{"test2": "testValue2"}
|
||||
_, hash2, _ := labels2.StringAndHash()
|
||||
instances := []models.AlertInstance{
|
||||
{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash1,
|
||||
},
|
||||
CurrentState: models.InstanceStateNormal,
|
||||
Labels: labels1,
|
||||
},
|
||||
{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash2,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: labels2,
|
||||
},
|
||||
}
|
||||
|
||||
_ = dbstore.SaveAlertInstances(ctx, instances...)
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
instanceStore state.InstanceStore
|
||||
|
||||
expectedStates map[string]*state.State
|
||||
|
||||
startingStateCacheCount int
|
||||
finalStateCacheCount int
|
||||
startingInstanceDBCount int
|
||||
finalInstanceDBCount int
|
||||
}{
|
||||
{
|
||||
desc: "all states/instances are removed from cache and DB",
|
||||
instanceStore: dbstore,
|
||||
expectedStates: map[string]*state.State{
|
||||
`[["test1","testValue1"]]`: {
|
||||
AlertRuleUID: rule.UID,
|
||||
OrgID: 1,
|
||||
CacheID: `[["test1","testValue1"]]`,
|
||||
Labels: data.Labels{"test1": "testValue1"},
|
||||
State: eval.Normal,
|
||||
EvaluationDuration: 0,
|
||||
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
|
||||
},
|
||||
`[["test2","testValue2"]]`: {
|
||||
AlertRuleUID: rule.UID,
|
||||
OrgID: 1,
|
||||
CacheID: `[["test2","testValue2"]]`,
|
||||
Labels: data.Labels{"test2": "testValue2"},
|
||||
State: eval.Alerting,
|
||||
EvaluationDuration: 0,
|
||||
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
|
||||
},
|
||||
},
|
||||
startingStateCacheCount: 2,
|
||||
finalStateCacheCount: 0,
|
||||
startingInstanceDBCount: 2,
|
||||
finalInstanceDBCount: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: &state.FakeHistorian{},
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
st.Warm(ctx, dbstore)
|
||||
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||
_ = dbstore.ListAlertInstances(ctx, q)
|
||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
|
||||
// We have loaded the expected number of entries from the db
|
||||
assert.Equal(t, tc.startingStateCacheCount, len(existingStatesForRule))
|
||||
assert.Equal(t, tc.startingInstanceDBCount, len(q.Result))
|
||||
|
||||
states := st.DeleteStateByRuleUID(ctx, rule.GetKey())
|
||||
|
||||
// Check that the deleted states are the same as the ones that were in cache
|
||||
assert.Equal(t, tc.startingStateCacheCount, len(states))
|
||||
for _, s := range states {
|
||||
assert.Equal(t, tc.expectedStates[s.CacheID], s)
|
||||
}
|
||||
|
||||
q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||
_ = dbstore.ListAlertInstances(ctx, q)
|
||||
existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
|
||||
// The expected number of state entries remains after states are deleted
|
||||
assert.Equal(t, tc.finalStateCacheCount, len(existingStatesForRule))
|
||||
assert.Equal(t, tc.finalInstanceDBCount, len(q.Result))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResetStateByRuleUID(t *testing.T) {
|
||||
interval := time.Minute
|
||||
ctx := context.Background()
|
||||
_, dbstore := tests.SetupTestEnv(t, 1)
|
||||
|
||||
const mainOrgID int64 = 1
|
||||
rule := tests.CreateTestAlertRule(t, ctx, dbstore, int64(interval.Seconds()), mainOrgID)
|
||||
|
||||
labels1 := models.InstanceLabels{"test1": "testValue1"}
|
||||
_, hash1, _ := labels1.StringAndHash()
|
||||
labels2 := models.InstanceLabels{"test2": "testValue2"}
|
||||
_, hash2, _ := labels2.StringAndHash()
|
||||
instances := []models.AlertInstance{
|
||||
{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash1,
|
||||
},
|
||||
CurrentState: models.InstanceStateNormal,
|
||||
Labels: labels1,
|
||||
},
|
||||
{
|
||||
AlertInstanceKey: models.AlertInstanceKey{
|
||||
RuleOrgID: rule.OrgID,
|
||||
RuleUID: rule.UID,
|
||||
LabelsHash: hash2,
|
||||
},
|
||||
CurrentState: models.InstanceStateFiring,
|
||||
Labels: labels2,
|
||||
},
|
||||
}
|
||||
|
||||
_ = dbstore.SaveAlertInstances(ctx, instances...)
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
instanceStore state.InstanceStore
|
||||
|
||||
expectedStates map[string]*state.State
|
||||
|
||||
startingStateCacheCount int
|
||||
finalStateCacheCount int
|
||||
startingInstanceDBCount int
|
||||
finalInstanceDBCount int
|
||||
newHistorianEntriesCount int
|
||||
}{
|
||||
{
|
||||
desc: "all states/instances are removed from cache and DB and saved in historian",
|
||||
instanceStore: dbstore,
|
||||
expectedStates: map[string]*state.State{
|
||||
`[["test1","testValue1"]]`: {
|
||||
AlertRuleUID: rule.UID,
|
||||
OrgID: 1,
|
||||
CacheID: `[["test1","testValue1"]]`,
|
||||
Labels: data.Labels{"test1": "testValue1"},
|
||||
State: eval.Normal,
|
||||
EvaluationDuration: 0,
|
||||
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
|
||||
},
|
||||
`[["test2","testValue2"]]`: {
|
||||
AlertRuleUID: rule.UID,
|
||||
OrgID: 1,
|
||||
CacheID: `[["test2","testValue2"]]`,
|
||||
Labels: data.Labels{"test2": "testValue2"},
|
||||
State: eval.Alerting,
|
||||
EvaluationDuration: 0,
|
||||
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
|
||||
},
|
||||
},
|
||||
startingStateCacheCount: 2,
|
||||
finalStateCacheCount: 0,
|
||||
startingInstanceDBCount: 2,
|
||||
finalInstanceDBCount: 0,
|
||||
newHistorianEntriesCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
fakeHistorian := &state.FakeHistorian{StateTransitions: make([]state.StateTransition, 0)}
|
||||
cfg := state.ManagerCfg{
|
||||
Metrics: testMetrics.GetStateMetrics(),
|
||||
ExternalURL: nil,
|
||||
InstanceStore: dbstore,
|
||||
Images: &state.NoopImageService{},
|
||||
Clock: clock.New(),
|
||||
Historian: fakeHistorian,
|
||||
}
|
||||
st := state.NewManager(cfg)
|
||||
st.Warm(ctx, dbstore)
|
||||
q := &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||
_ = dbstore.ListAlertInstances(ctx, q)
|
||||
existingStatesForRule := st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
|
||||
// We have loaded the expected number of entries from the db
|
||||
assert.Equal(t, tc.startingStateCacheCount, len(existingStatesForRule))
|
||||
assert.Equal(t, tc.startingInstanceDBCount, len(q.Result))
|
||||
|
||||
states := st.ResetStateByRuleUID(ctx, rule, models.StateReasonPaused)
|
||||
|
||||
// Check that the deleted states are the same as the ones that were in cache
|
||||
assert.Equal(t, tc.startingStateCacheCount, len(states))
|
||||
for _, s := range states {
|
||||
assert.Equal(t, tc.expectedStates[s.CacheID], s)
|
||||
}
|
||||
|
||||
// Check if both entries have been added to the historian
|
||||
assert.Equal(t, tc.newHistorianEntriesCount, len(fakeHistorian.StateTransitions))
|
||||
for _, str := range fakeHistorian.StateTransitions {
|
||||
assert.Equal(t, tc.expectedStates[str.State.CacheID].State, str.PreviousState)
|
||||
assert.Equal(t, tc.expectedStates[str.State.CacheID].StateReason, str.PreviousStateReason)
|
||||
}
|
||||
|
||||
q = &models.ListAlertInstancesQuery{RuleOrgID: rule.OrgID, RuleUID: rule.UID}
|
||||
_ = dbstore.ListAlertInstances(ctx, q)
|
||||
existingStatesForRule = st.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
|
||||
// The expected number of state entries remains after states are deleted
|
||||
assert.Equal(t, tc.finalStateCacheCount, len(existingStatesForRule))
|
||||
assert.Equal(t, tc.finalInstanceDBCount, len(q.Result))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -61,9 +61,12 @@ func (f *FakeRuleReader) ListAlertRules(_ context.Context, q *models.ListAlertRu
|
||||
return nil
|
||||
}
|
||||
|
||||
type FakeHistorian struct{}
|
||||
type FakeHistorian struct {
|
||||
StateTransitions []StateTransition
|
||||
}
|
||||
|
||||
func (f *FakeHistorian) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []StateTransition) <-chan error {
|
||||
f.StateTransitions = append(f.StateTransitions, states...)
|
||||
errCh := make(chan error)
|
||||
close(errCh)
|
||||
return errCh
|
||||
|
@ -68,8 +68,8 @@ func (st DBstore) DeleteAlertRulesByUID(ctx context.Context, orgID int64, ruleUI
|
||||
}
|
||||
|
||||
// IncreaseVersionForAllRulesInNamespace Increases version for all rules that have specified namespace. Returns all rules that belong to the namespace
|
||||
func (st DBstore) IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersion, error) {
|
||||
var keys []ngmodels.AlertRuleKeyWithVersion
|
||||
func (st DBstore) IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersionAndPauseStatus, error) {
|
||||
var keys []ngmodels.AlertRuleKeyWithVersionAndPauseStatus
|
||||
err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *db.Session) error {
|
||||
now := TimeNow()
|
||||
_, err := sess.Exec("UPDATE alert_rule SET version = version + 1, updated = ? WHERE namespace_uid = ? AND org_id = ?", now, namespaceUID, orgID)
|
||||
|
@ -316,7 +316,7 @@ func (f *RuleStore) UpdateRuleGroup(ctx context.Context, orgID int64, namespaceU
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *RuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, orgID int64, namespaceUID string) ([]models.AlertRuleKeyWithVersion, error) {
|
||||
func (f *RuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, orgID int64, namespaceUID string) ([]models.AlertRuleKeyWithVersionAndPauseStatus, error) {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
|
||||
@ -325,15 +325,18 @@ func (f *RuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, org
|
||||
Params: []interface{}{orgID, namespaceUID},
|
||||
})
|
||||
|
||||
var result []models.AlertRuleKeyWithVersion
|
||||
var result []models.AlertRuleKeyWithVersionAndPauseStatus
|
||||
|
||||
for _, rule := range f.Rules[orgID] {
|
||||
if rule.NamespaceUID == namespaceUID && rule.OrgID == orgID {
|
||||
rule.Version++
|
||||
rule.Updated = time.Now()
|
||||
result = append(result, models.AlertRuleKeyWithVersion{
|
||||
Version: rule.Version,
|
||||
AlertRuleKey: rule.GetKey(),
|
||||
result = append(result, models.AlertRuleKeyWithVersionAndPauseStatus{
|
||||
IsPaused: rule.IsPaused,
|
||||
AlertRuleKeyWithVersion: models.AlertRuleKeyWithVersion{
|
||||
Version: rule.Version,
|
||||
AlertRuleKey: rule.GetKey(),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -260,6 +260,16 @@ func AddAlertRuleMigrations(mg *migrator.Migrator, defaultIntervalSeconds int64)
|
||||
Default: "1",
|
||||
},
|
||||
))
|
||||
|
||||
mg.AddMigration("add is_paused column to alert_rule table", migrator.NewAddColumnMigration(
|
||||
alertRule,
|
||||
&migrator.Column{
|
||||
Name: "is_paused",
|
||||
Type: migrator.DB_Bool,
|
||||
Nullable: false,
|
||||
Default: "false",
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
func AddAlertRuleVersionMigrations(mg *migrator.Migrator) {
|
||||
@ -313,6 +323,16 @@ func AddAlertRuleVersionMigrations(mg *migrator.Migrator) {
|
||||
Default: "1",
|
||||
},
|
||||
))
|
||||
|
||||
mg.AddMigration("add is_paused column to alert_rule_versions table", migrator.NewAddColumnMigration(
|
||||
alertRuleVersion,
|
||||
&migrator.Column{
|
||||
Name: "is_paused",
|
||||
Type: migrator.DB_Bool,
|
||||
Nullable: false,
|
||||
Default: "false",
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
func AddAlertmanagerConfigMigrations(mg *migrator.Migrator) {
|
||||
|
Loading…
Reference in New Issue
Block a user