mirror of
https://github.com/grafana/grafana.git
synced 2024-11-25 10:20:29 -06:00
Alerting: Fix data races and improve testing (#81994)
* Alerting: fix race condition in (*ngalert/sender.ExternalAlertmanager).Run * Chore: Fix data races when accessing members of *ngalert/state.FakeInstanceStore * Chore: Fix data races in tests in ngalert/schedule and enable some parallel tests * Chore: fix linters * Chore: add TODO comment to remove loopvar once we move to Go 1.22
This commit is contained in:
parent
06b5875c3c
commit
9c29e1a783
@ -42,6 +42,8 @@ type evalAppliedInfo struct {
|
||||
}
|
||||
|
||||
func TestProcessTicks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testTracer := tracing.InitializeTracerForTest()
|
||||
reg := prometheus.NewPedanticRegistry()
|
||||
testMetrics := metrics.NewNGAlert(reg)
|
||||
@ -59,7 +61,7 @@ func TestProcessTicks(t *testing.T) {
|
||||
|
||||
mockedClock := clock.NewMock()
|
||||
|
||||
notifier := &AlertsSenderMock{}
|
||||
notifier := NewSyncAlertsSenderMock()
|
||||
notifier.EXPECT().Send(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
|
||||
appUrl := &url.URL{
|
||||
@ -362,9 +364,11 @@ func TestProcessTicks(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
createSchedule := func(
|
||||
evalAppliedChan chan time.Time,
|
||||
senderMock *AlertsSenderMock,
|
||||
senderMock *SyncAlertsSenderMock,
|
||||
) (*schedule, *fakeRulesStore, *state.FakeInstanceStore, prometheus.Gatherer) {
|
||||
ruleStore := newFakeRulesStore()
|
||||
instanceStore := &state.FakeInstanceStore{}
|
||||
@ -381,9 +385,21 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
|
||||
allStates := [...]eval.State{eval.Normal, eval.Alerting, eval.Pending, eval.NoData, eval.Error}
|
||||
|
||||
type normalStatesSetup struct {
|
||||
sch *schedule
|
||||
instanceStore *state.FakeInstanceStore
|
||||
reg prometheus.Gatherer
|
||||
rule *models.AlertRule
|
||||
folderTitle string
|
||||
expectedTime time.Time
|
||||
}
|
||||
|
||||
for _, evalState := range normalStates {
|
||||
// TODO rewrite when we are able to mock/fake state manager
|
||||
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
|
||||
// Make a local copy to allow parallel tests. TODO: remove when we move to Go 1.22:
|
||||
// https://go.dev/blog/loopvar-preview
|
||||
evalState := evalState
|
||||
|
||||
setupNormalStatesTest := func(t *testing.T) normalStatesSetup {
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
sch, ruleStore, instanceStore, reg := createSchedule(evalAppliedChan, nil)
|
||||
@ -408,38 +424,62 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
actualTime := waitForTimeChannel(t, evalAppliedChan)
|
||||
require.Equal(t, expectedTime, actualTime)
|
||||
|
||||
return normalStatesSetup{
|
||||
sch: sch,
|
||||
instanceStore: instanceStore,
|
||||
reg: reg,
|
||||
rule: rule,
|
||||
folderTitle: folderTitle,
|
||||
expectedTime: expectedTime,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO rewrite when we are able to mock/fake state manager
|
||||
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("it should add extra labels", func(t *testing.T) {
|
||||
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
t.Parallel()
|
||||
tt := setupNormalStatesTest(t)
|
||||
|
||||
states := tt.sch.stateManager.GetStatesForRuleUID(tt.rule.OrgID, tt.rule.UID)
|
||||
for _, s := range states {
|
||||
assert.Equal(t, rule.UID, s.Labels[alertingModels.RuleUIDLabel])
|
||||
assert.Equal(t, rule.NamespaceUID, s.Labels[alertingModels.NamespaceUIDLabel])
|
||||
assert.Equal(t, rule.Title, s.Labels[prometheusModel.AlertNameLabel])
|
||||
assert.Equal(t, folderTitle, s.Labels[models.FolderTitleLabel])
|
||||
assert.Equal(t, tt.rule.UID, s.Labels[alertingModels.RuleUIDLabel])
|
||||
assert.Equal(t, tt.rule.NamespaceUID, s.Labels[alertingModels.NamespaceUIDLabel])
|
||||
assert.Equal(t, tt.rule.Title, s.Labels[prometheusModel.AlertNameLabel])
|
||||
assert.Equal(t, tt.folderTitle, s.Labels[models.FolderTitleLabel])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("it should process evaluation results via state manager", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
tt := setupNormalStatesTest(t)
|
||||
|
||||
// TODO rewrite when we are able to mock/fake state manager
|
||||
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
states := tt.sch.stateManager.GetStatesForRuleUID(tt.rule.OrgID, tt.rule.UID)
|
||||
require.Len(t, states, 1)
|
||||
s := states[0]
|
||||
require.Equal(t, rule.UID, s.AlertRuleUID)
|
||||
require.Equal(t, tt.rule.UID, s.AlertRuleUID)
|
||||
require.Len(t, s.Results, 1)
|
||||
var expectedStatus = evalState
|
||||
if evalState == eval.Pending {
|
||||
expectedStatus = eval.Alerting
|
||||
}
|
||||
require.Equal(t, expectedStatus.String(), s.Results[0].EvaluationState.String())
|
||||
require.Equal(t, expectedTime, s.Results[0].EvaluationTime)
|
||||
require.Equal(t, tt.expectedTime, s.Results[0].EvaluationTime)
|
||||
})
|
||||
|
||||
t.Run("it should save alert instances to storage", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
tt := setupNormalStatesTest(t)
|
||||
|
||||
// TODO rewrite when we are able to mock/fake state manager
|
||||
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
|
||||
states := tt.sch.stateManager.GetStatesForRuleUID(tt.rule.OrgID, tt.rule.UID)
|
||||
require.Len(t, states, 1)
|
||||
s := states[0]
|
||||
|
||||
var cmd *models.AlertInstance
|
||||
for _, op := range instanceStore.RecordedOps {
|
||||
for _, op := range tt.instanceStore.RecordedOps() {
|
||||
switch q := op.(type) {
|
||||
case models.AlertInstance:
|
||||
cmd = &q
|
||||
@ -451,14 +491,17 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
|
||||
require.NotNil(t, cmd)
|
||||
t.Logf("Saved alert instances: %v", cmd)
|
||||
require.Equal(t, rule.OrgID, cmd.RuleOrgID)
|
||||
require.Equal(t, expectedTime, cmd.LastEvalTime)
|
||||
require.Equal(t, rule.UID, cmd.RuleUID)
|
||||
require.Equal(t, tt.rule.OrgID, cmd.RuleOrgID)
|
||||
require.Equal(t, tt.expectedTime, cmd.LastEvalTime)
|
||||
require.Equal(t, tt.rule.UID, cmd.RuleUID)
|
||||
require.Equal(t, evalState.String(), string(cmd.CurrentState))
|
||||
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
|
||||
})
|
||||
|
||||
t.Run("it reports metrics", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
tt := setupNormalStatesTest(t)
|
||||
|
||||
// duration metric has 0 values because of mocked clock that do not advance
|
||||
expectedMetric := fmt.Sprintf(
|
||||
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
||||
@ -521,16 +564,20 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||
grafana_alerting_rule_send_alerts_duration_seconds_sum{org="%[1]d"} 0
|
||||
grafana_alerting_rule_send_alerts_duration_seconds_count{org="%[1]d"} 1
|
||||
`, rule.OrgID)
|
||||
`, tt.rule.OrgID)
|
||||
|
||||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
||||
err := testutil.GatherAndCompare(tt.reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("should exit", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("and not clear the state if parent context is cancelled", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stoppedChan := make(chan error)
|
||||
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
|
||||
|
||||
@ -550,7 +597,10 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(expectedStates), len(sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)))
|
||||
})
|
||||
|
||||
t.Run("and clean up the state if delete is cancellation reason ", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
stoppedChan := make(chan error)
|
||||
sch, _, _, _ := createSchedule(make(chan time.Time), nil)
|
||||
|
||||
@ -573,6 +623,8 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("when a message is sent to update channel", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
|
||||
folderTitle := "folderName"
|
||||
ruleFp := ruleWithFolder{rule, folderTitle}.Fingerprint()
|
||||
@ -581,10 +633,10 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
updateChan := make(chan ruleVersionAndPauseStatus)
|
||||
|
||||
sender := AlertsSenderMock{}
|
||||
sender := NewSyncAlertsSenderMock()
|
||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
sch.schedulableAlertRules.set([]*models.AlertRule{rule}, map[models.FolderKey]string{rule.GetFolderKey(): folderTitle})
|
||||
|
||||
@ -644,45 +696,64 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
updateChan <- ruleVersionAndPauseStatus{ruleFp + 1, false}
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return len(sender.Calls) > 0
|
||||
return len(sender.Calls()) > 0
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
require.Empty(t, sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID))
|
||||
sender.AssertNumberOfCalls(t, "Send", 1)
|
||||
args, ok := sender.Calls[0].Arguments[2].(definitions.PostableAlerts)
|
||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[2]))
|
||||
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
||||
require.Len(t, args.PostableAlerts, expectedToBeSent)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("when evaluation fails", func(t *testing.T) {
|
||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Error))()
|
||||
rule.ExecErrState = models.ErrorErrState
|
||||
t.Parallel()
|
||||
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sender := AlertsSenderMock{}
|
||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, &sender)
|
||||
sch.maxAttempts = 3
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
|
||||
}()
|
||||
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: sch.clock.Now(),
|
||||
rule: rule,
|
||||
type testSetup struct {
|
||||
rule *models.AlertRule
|
||||
sender *SyncAlertsSenderMock
|
||||
reg prometheus.Gatherer
|
||||
}
|
||||
|
||||
waitForTimeChannel(t, evalAppliedChan)
|
||||
setup := func(t *testing.T) testSetup {
|
||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Error))()
|
||||
rule.ExecErrState = models.ErrorErrState
|
||||
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sender := NewSyncAlertsSenderMock()
|
||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, reg := createSchedule(evalAppliedChan, sender)
|
||||
sch.maxAttempts = 3
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
_ = sch.ruleRoutine(ctx, rule.GetKey(), evalChan, make(chan ruleVersionAndPauseStatus))
|
||||
}()
|
||||
|
||||
evalChan <- &evaluation{
|
||||
scheduledAt: sch.clock.Now(),
|
||||
rule: rule,
|
||||
}
|
||||
|
||||
waitForTimeChannel(t, evalAppliedChan)
|
||||
|
||||
return testSetup{
|
||||
rule: rule,
|
||||
sender: sender,
|
||||
reg: reg,
|
||||
}
|
||||
}
|
||||
|
||||
t.Run("it should increase failure counter", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
tt := setup(t)
|
||||
|
||||
// duration metric has 0 values because of mocked clock that do not advance
|
||||
expectedMetric := fmt.Sprintf(
|
||||
`# HELP grafana_alerting_rule_evaluation_duration_seconds The time to evaluate a rule.
|
||||
@ -745,33 +816,40 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
grafana_alerting_rule_send_alerts_duration_seconds_bucket{org="%[1]d",le="+Inf"} 1
|
||||
grafana_alerting_rule_send_alerts_duration_seconds_sum{org="%[1]d"} 0
|
||||
grafana_alerting_rule_send_alerts_duration_seconds_count{org="%[1]d"} 1
|
||||
`, rule.OrgID)
|
||||
`, tt.rule.OrgID)
|
||||
|
||||
err := testutil.GatherAndCompare(reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
||||
err := testutil.GatherAndCompare(tt.reg, bytes.NewBufferString(expectedMetric), "grafana_alerting_rule_evaluation_duration_seconds", "grafana_alerting_rule_evaluations_total", "grafana_alerting_rule_evaluation_failures_total", "grafana_alerting_rule_process_evaluation_duration_seconds", "grafana_alerting_rule_send_alerts_duration_seconds")
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("it should send special alert DatasourceError", func(t *testing.T) {
|
||||
sender.AssertNumberOfCalls(t, "Send", 1)
|
||||
args, ok := sender.Calls[0].Arguments[2].(definitions.PostableAlerts)
|
||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[2]))
|
||||
t.Parallel()
|
||||
tt := setup(t)
|
||||
|
||||
tt.sender.AssertNumberOfCalls(t, "Send", 1)
|
||||
args, ok := tt.sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", tt.sender.Calls()[0].Arguments[2]))
|
||||
assert.Len(t, args.PostableAlerts, 1)
|
||||
assert.Equal(t, state.ErrorAlertName, args.PostableAlerts[0].Labels[prometheusModel.AlertNameLabel])
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("when there are alerts that should be firing", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("it should call sender", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// eval.Alerting makes state manager to create notifications for alertmanagers
|
||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Alerting))()
|
||||
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sender := AlertsSenderMock{}
|
||||
sender := NewSyncAlertsSenderMock()
|
||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
|
||||
go func() {
|
||||
@ -788,23 +866,25 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
waitForTimeChannel(t, evalAppliedChan)
|
||||
|
||||
sender.AssertNumberOfCalls(t, "Send", 1)
|
||||
args, ok := sender.Calls[0].Arguments[2].(definitions.PostableAlerts)
|
||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls[0].Arguments[2]))
|
||||
args, ok := sender.Calls()[0].Arguments[2].(definitions.PostableAlerts)
|
||||
require.Truef(t, ok, fmt.Sprintf("expected argument of function was supposed to be 'definitions.PostableAlerts' but got %T", sender.Calls()[0].Arguments[2]))
|
||||
|
||||
require.Len(t, args.PostableAlerts, 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rule := models.AlertRuleGen(withQueryForState(t, eval.Normal))()
|
||||
|
||||
evalChan := make(chan *evaluation)
|
||||
evalAppliedChan := make(chan time.Time)
|
||||
|
||||
sender := AlertsSenderMock{}
|
||||
sender := NewSyncAlertsSenderMock()
|
||||
sender.EXPECT().Send(mock.Anything, rule.GetKey(), mock.Anything).Return()
|
||||
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, &sender)
|
||||
sch, ruleStore, _, _ := createSchedule(evalAppliedChan, sender)
|
||||
ruleStore.PutRule(context.Background(), rule)
|
||||
|
||||
go func() {
|
||||
@ -827,8 +907,14 @@ func TestSchedule_ruleRoutine(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSchedule_deleteAlertRule(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("when rule exists", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sch := setupScheduler(t, nil, nil, nil, nil, nil)
|
||||
rule := models.AlertRuleGen()()
|
||||
key := rule.GetKey()
|
||||
@ -839,7 +925,11 @@ func TestSchedule_deleteAlertRule(t *testing.T) {
|
||||
})
|
||||
})
|
||||
t.Run("when rule does not exist", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("should exit", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
sch := setupScheduler(t, nil, nil, nil, nil, nil)
|
||||
key := models.GenerateRuleKey(rand.Int63())
|
||||
sch.deleteAlertRule(key)
|
||||
@ -847,7 +937,7 @@ func TestSchedule_deleteAlertRule(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStore, registry *prometheus.Registry, senderMock *AlertsSenderMock, evalMock eval.EvaluatorFactory) *schedule {
|
||||
func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStore, registry *prometheus.Registry, senderMock *SyncAlertsSenderMock, evalMock eval.EvaluatorFactory) *schedule {
|
||||
t.Helper()
|
||||
testTracer := tracing.InitializeTracerForTest()
|
||||
|
||||
@ -877,7 +967,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
}
|
||||
|
||||
if senderMock == nil {
|
||||
senderMock = &AlertsSenderMock{}
|
||||
senderMock = NewSyncAlertsSenderMock()
|
||||
senderMock.EXPECT().Send(mock.Anything, mock.Anything, mock.Anything).Return()
|
||||
}
|
||||
|
||||
|
@ -2,10 +2,14 @@ package schedule
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
definitions "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// waitForTimeChannel blocks the execution until either the channel ch has some data or a timeout of 10 second expires.
|
||||
@ -81,3 +85,26 @@ func (f *fakeRulesStore) DeleteRule(rules ...*models.AlertRule) {
|
||||
func (f *fakeRulesStore) getNamespaceTitle(uid string) string {
|
||||
return "TEST-FOLDER-" + uid
|
||||
}
|
||||
|
||||
type SyncAlertsSenderMock struct {
|
||||
*AlertsSenderMock
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewSyncAlertsSenderMock() *SyncAlertsSenderMock {
|
||||
return &SyncAlertsSenderMock{
|
||||
AlertsSenderMock: new(AlertsSenderMock),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SyncAlertsSenderMock) Send(ctx context.Context, key models.AlertRuleKey, alerts definitions.PostableAlerts) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.AlertsSenderMock.Send(ctx, key, alerts)
|
||||
}
|
||||
|
||||
func (m *SyncAlertsSenderMock) Calls() []mock.Call {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return slices.Clone(m.AlertsSenderMock.Calls)
|
||||
}
|
||||
|
@ -127,13 +127,14 @@ func (s *ExternalAlertmanager) ApplyConfig(orgId, id int64, alertmanagers []Exte
|
||||
}
|
||||
|
||||
func (s *ExternalAlertmanager) Run() {
|
||||
logger := s.logger
|
||||
s.wg.Add(2)
|
||||
|
||||
go func() {
|
||||
s.logger.Info("Initiating communication with a group of external Alertmanagers")
|
||||
logger.Info("Initiating communication with a group of external Alertmanagers")
|
||||
|
||||
if err := s.sdManager.Run(); err != nil {
|
||||
s.logger.Error("Failed to start the sender service discovery manager", "error", err)
|
||||
logger.Error("Failed to start the sender service discovery manager", "error", err)
|
||||
}
|
||||
s.wg.Done()
|
||||
}()
|
||||
|
@ -1380,7 +1380,7 @@ func TestProcessEvalResults(t *testing.T) {
|
||||
require.NotEmpty(t, states)
|
||||
|
||||
savedStates := make(map[string]models.AlertInstance)
|
||||
for _, op := range instanceStore.RecordedOps {
|
||||
for _, op := range instanceStore.RecordedOps() {
|
||||
switch q := op.(type) {
|
||||
case models.AlertInstance:
|
||||
cacheId, err := q.Labels.StringKey()
|
||||
@ -1669,7 +1669,7 @@ func TestStaleResults(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("should delete stale states from the database", func(t *testing.T) {
|
||||
for _, op := range store.RecordedOps {
|
||||
for _, op := range store.RecordedOps() {
|
||||
switch q := op.(type) {
|
||||
case state.FakeInstanceStoreOp:
|
||||
keys, ok := q.Args[1].([]models.AlertInstanceKey)
|
||||
|
@ -42,7 +42,7 @@ func TestAsyncStatePersister_Async(t *testing.T) {
|
||||
|
||||
// Check if the state was saved
|
||||
require.Eventually(t, func() bool {
|
||||
return len(store.RecordedOps) == 1
|
||||
return len(store.RecordedOps()) == 1
|
||||
}, time.Second*5, time.Second)
|
||||
})
|
||||
t.Run("It should save on context done", func(t *testing.T) {
|
||||
@ -71,7 +71,7 @@ func TestAsyncStatePersister_Async(t *testing.T) {
|
||||
|
||||
// Check if the context cancellation was handled correctly
|
||||
require.Eventually(t, func() bool {
|
||||
return len(store.RecordedOps) == 1
|
||||
return len(store.RecordedOps()) == 1
|
||||
}, time.Second*5, time.Second)
|
||||
})
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
|
||||
})
|
||||
syncStatePersister.Sync(context.Background(), span, transitions, nil)
|
||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||
for _, op := range st.RecordedOps {
|
||||
for _, op := range st.RecordedOps() {
|
||||
saved := op.(ngmodels.AlertInstance)
|
||||
savedKeys[saved.AlertInstanceKey] = saved
|
||||
}
|
||||
@ -89,7 +89,7 @@ func TestSyncPersister_saveAlertStates(t *testing.T) {
|
||||
syncStatePersister.Sync(context.Background(), span, transitions, nil)
|
||||
|
||||
savedKeys := map[ngmodels.AlertInstanceKey]ngmodels.AlertInstance{}
|
||||
for _, op := range st.RecordedOps {
|
||||
for _, op := range st.RecordedOps() {
|
||||
saved := op.(ngmodels.AlertInstance)
|
||||
savedKeys[saved.AlertInstanceKey] = saved
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
@ -13,7 +14,7 @@ var _ InstanceStore = &FakeInstanceStore{}
|
||||
|
||||
type FakeInstanceStore struct {
|
||||
mtx sync.Mutex
|
||||
RecordedOps []any
|
||||
recordedOps []any
|
||||
}
|
||||
|
||||
type FakeInstanceStoreOp struct {
|
||||
@ -21,17 +22,23 @@ type FakeInstanceStoreOp struct {
|
||||
Args []any
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) RecordedOps() []any {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
return slices.Clone(f.recordedOps)
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) ListAlertInstances(_ context.Context, q *models.ListAlertInstancesQuery) ([]*models.AlertInstance, error) {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = append(f.RecordedOps, *q)
|
||||
f.recordedOps = append(f.recordedOps, *q)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *FakeInstanceStore) SaveAlertInstance(_ context.Context, q models.AlertInstance) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = append(f.RecordedOps, q)
|
||||
f.recordedOps = append(f.recordedOps, q)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -40,7 +47,7 @@ func (f *FakeInstanceStore) FetchOrgIds(_ context.Context) ([]int64, error) { re
|
||||
func (f *FakeInstanceStore) DeleteAlertInstances(ctx context.Context, q ...models.AlertInstanceKey) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = append(f.RecordedOps, FakeInstanceStoreOp{
|
||||
f.recordedOps = append(f.recordedOps, FakeInstanceStoreOp{
|
||||
Name: "DeleteAlertInstances", Args: []any{
|
||||
ctx,
|
||||
q,
|
||||
@ -56,9 +63,9 @@ func (f *FakeInstanceStore) DeleteAlertInstancesByRule(ctx context.Context, key
|
||||
func (f *FakeInstanceStore) FullSync(ctx context.Context, instances []models.AlertInstance) error {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
f.RecordedOps = []any{}
|
||||
f.recordedOps = []any{}
|
||||
for _, instance := range instances {
|
||||
f.RecordedOps = append(f.RecordedOps, instance)
|
||||
f.recordedOps = append(f.recordedOps, instance)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user