Alerting: Tests for rule evaluation routine (#40646)

* add fake stores to record queries
This commit is contained in:
Yuriy Tseretyan 2021-10-26 13:22:07 -04:00 committed by GitHub
parent 49dee63453
commit 6709359148
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 458 additions and 27 deletions

View File

@ -10,6 +10,11 @@ import (
"time"
"github.com/benbjohnson/clock"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/encryption/ossencryption"
apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
@ -17,12 +22,10 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestSendingToExternalAlertmanager(t *testing.T) {
@ -33,7 +36,7 @@ func TestSendingToExternalAlertmanager(t *testing.T) {
fakeAdminConfigStore := newFakeAdminConfigStore(t)
// create alert rule with one second interval
alertRule := CreateTestAlertRule(t, fakeRuleStore, 1, 1)
alertRule := CreateTestAlertRule(t, fakeRuleStore, 1, 1, eval.Alerting)
// First, let's create an admin configuration that holds an alertmanager.
adminConfig := &models.AdminConfiguration{OrgID: 1, Alertmanagers: []string{fakeAM.server.URL}}
@ -231,6 +234,325 @@ func TestSendingToExternalAlertmanager_WithMultipleOrgs(t *testing.T) {
}, 10*time.Second, 200*time.Millisecond, "Alertmanager for org 1 and 2 were never removed")
}
func TestSchedule_ruleRoutine(t *testing.T) {
createSchedule := func(
evalAppliedChan chan time.Time,
) (*schedule, *fakeRuleStore, *fakeInstanceStore, *fakeAdminConfigStore) {
ruleStore := newFakeRuleStore(t)
instanceStore := &fakeInstanceStore{}
adminConfigStore := newFakeAdminConfigStore(t)
sch, _ := setupScheduler(t, ruleStore, instanceStore, adminConfigStore)
sch.evalAppliedFunc = func(key models.AlertRuleKey, t time.Time) {
evalAppliedChan <- t
}
return sch, ruleStore, instanceStore, adminConfigStore
}
// normal states do not include NoData and Error because currently it is not possible to perform any sensible test
normalStates := []eval.State{eval.Normal, eval.Alerting, eval.Pending}
randomNormalState := func() eval.State {
// pick only supported cases
return normalStates[rand.Intn(3)]
}
for _, evalState := range normalStates {
// TODO rewrite when we are able to mock/fake state manager
t.Run(fmt.Sprintf("when rule evaluation happens (evaluation state %s)", evalState), func(t *testing.T) {
evalChan := make(chan *evalContext)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, instanceStore, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), evalState)
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
}()
expectedTime := time.UnixMicro(rand.Int63())
evalChan <- &evalContext{
now: expectedTime,
version: rule.Version,
}
actualTime := waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
t.Run("it should get rule from database when run the first time", func(t *testing.T) {
queries := make([]models.GetAlertRuleByUIDQuery, 0)
for _, op := range ruleStore.recordedOps {
switch q := op.(type) {
case models.GetAlertRuleByUIDQuery:
queries = append(queries, q)
}
}
require.NotEmptyf(t, queries, "Expected a %T request to rule store but nothing was recorded", models.GetAlertRuleByUIDQuery{})
require.Len(t, queries, 1, "Expected exactly one request of %T but got %d", models.GetAlertRuleByUIDQuery{}, len(queries))
require.Equal(t, rule.UID, queries[0].UID)
require.Equal(t, rule.OrgID, queries[0].OrgID)
})
t.Run("it should process evaluation results via state manager", func(t *testing.T) {
// TODO rewrite when we are able to mock/fake state manager
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, states, 1)
s := states[0]
t.Logf("State: %v", s)
require.Equal(t, rule.UID, s.AlertRuleUID)
require.Len(t, s.Results, 1)
var expectedStatus = evalState
if evalState == eval.Pending {
expectedStatus = eval.Alerting
}
require.Equal(t, expectedStatus.String(), s.Results[0].EvaluationState.String())
require.Equal(t, expectedTime, s.Results[0].EvaluationTime)
})
t.Run("it should save alert instances to storage", func(t *testing.T) {
// TODO rewrite when we are able to mock/fake state manager
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.Len(t, states, 1)
s := states[0]
var cmd *models.SaveAlertInstanceCommand
for _, op := range instanceStore.recordedOps {
switch q := op.(type) {
case models.SaveAlertInstanceCommand:
cmd = &q
}
if cmd != nil {
break
}
}
require.NotNil(t, cmd)
t.Logf("Saved alert instance: %v", cmd)
require.Equal(t, rule.OrgID, cmd.RuleOrgID)
require.Equal(t, expectedTime, cmd.LastEvalTime)
require.Equal(t, cmd.RuleUID, cmd.RuleUID)
require.Equal(t, evalState.String(), string(cmd.State))
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
})
t.Run("it reports metrics", func(t *testing.T) {
// TODO fix it when we update the way we use metrics
t.Skip()
})
})
}
t.Run("should exit", func(t *testing.T) {
t.Run("when we signal it to stop", func(t *testing.T) {
stopChan := make(chan struct{})
stoppedChan := make(chan error)
sch, _, _, _ := createSchedule(make(chan time.Time))
go func() {
err := sch.ruleRoutine(context.Background(), models.AlertRuleKey{}, make(chan *evalContext), stopChan)
stoppedChan <- err
}()
stopChan <- struct{}{}
err := waitForErrChannel(t, stoppedChan)
require.NoError(t, err)
})
t.Run("when context is cancelled", func(t *testing.T) {
stoppedChan := make(chan error)
sch, _, _, _ := createSchedule(make(chan time.Time))
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := sch.ruleRoutine(ctx, models.AlertRuleKey{}, make(chan *evalContext), make(chan struct{}))
stoppedChan <- err
}()
cancel()
err := waitForErrChannel(t, stoppedChan)
require.ErrorIs(t, err, context.Canceled)
})
})
t.Run("should fetch rule from database only if new version is greater than current", func(t *testing.T) {
evalChan := make(chan *evalContext)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, _, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
}()
expectedTime := time.UnixMicro(rand.Int63())
evalChan <- &evalContext{
now: expectedTime,
version: rule.Version,
}
actualTime := waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
// Now update the rule
newRule := *rule
newRule.Version++
ruleStore.putRule(&newRule)
// and call with new version
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
evalChan <- &evalContext{
now: expectedTime,
version: newRule.Version,
}
actualTime = waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
queries := make([]models.GetAlertRuleByUIDQuery, 0)
for _, op := range ruleStore.recordedOps {
switch q := op.(type) {
case models.GetAlertRuleByUIDQuery:
queries = append(queries, q)
}
}
require.Len(t, queries, 2, "Expected exactly two request of %T", models.GetAlertRuleByUIDQuery{})
require.Equal(t, rule.UID, queries[0].UID)
require.Equal(t, rule.OrgID, queries[0].OrgID)
require.Equal(t, rule.UID, queries[1].UID)
require.Equal(t, rule.OrgID, queries[1].OrgID)
})
t.Run("should not fetch rule if version is equal or less than current", func(t *testing.T) {
evalChan := make(chan *evalContext)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, _, _ := createSchedule(evalAppliedChan)
rule := CreateTestAlertRule(t, ruleStore, 10, rand.Int63(), randomNormalState())
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
}()
expectedTime := time.UnixMicro(rand.Int63())
evalChan <- &evalContext{
now: expectedTime,
version: rule.Version,
}
actualTime := waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
// try again with the same version
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
evalChan <- &evalContext{
now: expectedTime,
version: rule.Version,
}
actualTime = waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
expectedTime = expectedTime.Add(time.Duration(rand.Intn(10)) * time.Second)
evalChan <- &evalContext{
now: expectedTime,
version: rule.Version - 1,
}
actualTime = waitForTimeChannel(t, evalAppliedChan)
require.Equal(t, expectedTime, actualTime)
queries := make([]models.GetAlertRuleByUIDQuery, 0)
for _, op := range ruleStore.recordedOps {
switch q := op.(type) {
case models.GetAlertRuleByUIDQuery:
queries = append(queries, q)
}
}
require.Len(t, queries, 1, "Expected exactly one request of %T", models.GetAlertRuleByUIDQuery{})
})
t.Run("when evaluation fails", func(t *testing.T) {
t.Run("it should increase failure counter", func(t *testing.T) {
t.Skip()
// TODO implement check for counter
})
t.Run("it should retry up to configured times", func(t *testing.T) {
// TODO figure out how to simulate failure
t.Skip()
})
})
t.Run("when there are alerts that should be firing", func(t *testing.T) {
t.Run("it should send to local alertmanager if configured for organization", func(t *testing.T) {
// TODO figure out how to simulate multiorg alertmanager
t.Skip()
})
t.Run("it should send to external alertmanager if configured for organization", func(t *testing.T) {
fakeAM := NewFakeExternalAlertmanager(t)
defer fakeAM.Close()
orgID := rand.Int63()
s, err := sender.New(nil)
require.NoError(t, err)
adminConfig := &models.AdminConfiguration{OrgID: orgID, Alertmanagers: []string{fakeAM.server.URL}}
err = s.ApplyConfig(adminConfig)
require.NoError(t, err)
s.Run()
defer s.Stop()
require.Eventuallyf(t, func() bool {
return len(s.Alertmanagers()) == 1
}, 20*time.Second, 200*time.Millisecond, "external Alertmanager was not discovered.")
evalChan := make(chan *evalContext)
evalAppliedChan := make(chan time.Time)
sch, ruleStore, _, _ := createSchedule(evalAppliedChan)
sch.senders[orgID] = s
// eval.Alerting makes state manager to create notifications for alertmanagers
rule := CreateTestAlertRule(t, ruleStore, 10, orgID, eval.Alerting)
go func() {
stop := make(chan struct{})
t.Cleanup(func() {
close(stop)
})
_ = sch.ruleRoutine(context.Background(), rule.GetKey(), evalChan, stop)
}()
evalChan <- &evalContext{
now: time.Now(),
version: rule.Version,
}
waitForTimeChannel(t, evalAppliedChan)
var count int
require.Eventuallyf(t, func() bool {
count = fakeAM.AlertsCount()
return count == 1 && fakeAM.AlertNamesCompare([]string{rule.Title})
}, 20*time.Second, 200*time.Millisecond, "Alertmanager never received an '%s', received alerts count: %d", rule.Title, count)
})
})
t.Run("when there are no alerts to send it should not call notifiers", func(t *testing.T) {
// TODO needs some mocking/stubbing for Alertmanager and Sender to make sure it was not called
t.Skip()
})
}
func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, acs store.AdminConfigurationStore) (*schedule, *clock.Mock) {
t.Helper()
@ -263,11 +585,46 @@ func setupScheduler(t *testing.T, rs store.RuleStore, is store.InstanceStore, ac
}
// createTestAlertRule creates a dummy alert definition to be used by the tests.
func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds int64, orgID int64) *models.AlertRule {
func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds int64, orgID int64, evalResult eval.State) *models.AlertRule {
t.Helper()
records := make([]interface{}, 0, len(dbstore.recordedOps))
copy(records, dbstore.recordedOps)
defer func() {
// erase queries that were made by the testing suite
dbstore.recordedOps = records
}()
d := rand.Intn(1000)
ruleGroup := fmt.Sprintf("ruleGroup-%d", d)
var expression string
var forDuration time.Duration
switch evalResult {
case eval.Normal:
expression = `{
"datasourceUid": "-100",
"type":"math",
"expression":"2 + 1 < 1"
}`
case eval.Pending, eval.Alerting:
expression = `{
"datasourceUid": "-100",
"type":"math",
"expression":"2 + 2 > 1"
}`
if evalResult == eval.Pending {
forDuration = 100 * time.Second
}
case eval.Error:
expression = `{
"datasourceUid": "-100",
"type":"math",
"expression":"$A"
}`
case eval.NoData:
// TODO Implement support for NoData
require.Fail(t, "Alert rule with desired evaluation result NoData is not supported yet")
}
err := dbstore.UpdateRuleGroup(store.UpdateRuleGroupCmd{
OrgID: orgID,
NamespaceUID: "namespace",
@ -278,6 +635,7 @@ func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds i
{
ApiRuleNode: &apimodels.ApiRuleNode{
Annotations: map[string]string{"testAnnoKey": "testAnnoValue"},
For: model.Duration(forDuration),
},
GrafanaManagedAlert: &apimodels.PostableGrafanaRule{
Title: fmt.Sprintf("an alert definition %d", d),
@ -285,11 +643,7 @@ func CreateTestAlertRule(t *testing.T, dbstore *fakeRuleStore, intervalSeconds i
Data: []models.AlertQuery{
{
DatasourceUID: "-100",
Model: json.RawMessage(`{
"datasourceUid": "-100",
"type":"math",
"expression":"2 + 2 > 1"
}`),
Model: json.RawMessage(expression),
RelativeTimeRange: models.RelativeTimeRange{
From: models.Duration(5 * time.Hour),
To: models.Duration(3 * time.Hour),

View File

@ -21,15 +21,52 @@ import (
"github.com/stretchr/testify/require"
)
// waitForTimeChannel blocks the execution until either the channel ch has some data or a timeout of 10 second expires.
// Timeout will cause the test to fail.
// Returns the data from the channel.
func waitForTimeChannel(t *testing.T, ch chan time.Time) time.Time {
select {
case result := <-ch:
return result
case <-time.After(time.Duration(10) * time.Second):
t.Fatalf("Timeout waiting for data in the time channel")
return time.Time{}
}
}
// waitForErrChannel blocks the execution until either the channel ch has some data or a timeout of 10 second expires.
// Timeout will cause the test to fail.
// Returns the data from the channel.
func waitForErrChannel(t *testing.T, ch chan error) error {
timeout := time.Duration(10) * time.Second
select {
case result := <-ch:
return result
case <-time.After(timeout):
t.Fatal("Timeout waiting for data in the error channel")
return nil
}
}
func newFakeRuleStore(t *testing.T) *fakeRuleStore {
return &fakeRuleStore{t: t, rules: map[int64]map[string]map[string][]*models.AlertRule{}}
}
// FakeRuleStore mocks the RuleStore of the scheduler.
type fakeRuleStore struct {
t *testing.T
mtx sync.Mutex
rules map[int64]map[string]map[string][]*models.AlertRule
t *testing.T
mtx sync.Mutex
rules map[int64]map[string]map[string][]*models.AlertRule
recordedOps []interface{}
}
// putRule puts the rule in the rules map. If there are existing rule in the same namespace, they will be overwritten
func (f *fakeRuleStore) putRule(r *models.AlertRule) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.rules[r.OrgID][r.RuleGroup][r.NamespaceUID] = []*models.AlertRule{
r,
}
}
func (f *fakeRuleStore) DeleteAlertRuleByUID(_ int64, _ string) error { return nil }
@ -43,7 +80,7 @@ func (f *fakeRuleStore) DeleteAlertInstancesByRuleUID(_ int64, _ string) error {
func (f *fakeRuleStore) GetAlertRuleByUID(q *models.GetAlertRuleByUIDQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
rgs, ok := f.rules[q.OrgID]
if !ok {
return nil
@ -67,7 +104,7 @@ func (f *fakeRuleStore) GetAlertRuleByUID(q *models.GetAlertRuleByUIDQuery) erro
func (f *fakeRuleStore) GetAlertRulesForScheduling(q *models.ListAlertRulesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
for _, rg := range f.rules {
for _, n := range rg {
for _, r := range n {
@ -78,13 +115,22 @@ func (f *fakeRuleStore) GetAlertRulesForScheduling(q *models.ListAlertRulesQuery
return nil
}
func (f *fakeRuleStore) GetOrgAlertRules(_ *models.ListAlertRulesQuery) error { return nil }
func (f *fakeRuleStore) GetNamespaceAlertRules(_ *models.ListNamespaceAlertRulesQuery) error {
func (f *fakeRuleStore) GetOrgAlertRules(q *models.ListAlertRulesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
return nil
}
func (f *fakeRuleStore) GetNamespaceAlertRules(q *models.ListNamespaceAlertRulesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
return nil
}
func (f *fakeRuleStore) GetRuleGroupAlertRules(q *models.ListRuleGroupAlertRulesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
rgs, ok := f.rules[q.OrgID]
if !ok {
return nil
@ -116,11 +162,23 @@ func (f *fakeRuleStore) GetNamespaces(_ context.Context, _ int64, _ *models2.Sig
func (f *fakeRuleStore) GetNamespaceByTitle(_ context.Context, _ string, _ int64, _ *models2.SignedInUser, _ bool) (*models2.Folder, error) {
return nil, nil
}
func (f *fakeRuleStore) GetOrgRuleGroups(_ *models.ListOrgRuleGroupsQuery) error { return nil }
func (f *fakeRuleStore) UpsertAlertRules(_ []store.UpsertRule) error { return nil }
func (f *fakeRuleStore) GetOrgRuleGroups(q *models.ListOrgRuleGroupsQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
return nil
}
func (f *fakeRuleStore) UpsertAlertRules(q []store.UpsertRule) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, q)
return nil
}
func (f *fakeRuleStore) UpdateRuleGroup(cmd store.UpdateRuleGroupCmd) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, cmd)
rgs, ok := f.rules[cmd.OrgID]
if !ok {
f.rules[cmd.OrgID] = map[string]map[string][]*models.AlertRule{}
@ -138,7 +196,7 @@ func (f *fakeRuleStore) UpdateRuleGroup(cmd store.UpdateRuleGroupCmd) error {
rules := []*models.AlertRule{}
for _, r := range cmd.RuleGroupConfig.Rules {
//TODO: Not sure why this is not being set properly, where is the code that sets this?
// TODO: Not sure why this is not being set properly, where is the code that sets this?
for i := range r.GrafanaManagedAlert.Data {
r.GrafanaManagedAlert.Data[i].DatasourceUID = "-100"
}
@ -181,13 +239,32 @@ func (f *fakeRuleStore) UpdateRuleGroup(cmd store.UpdateRuleGroupCmd) error {
return nil
}
type fakeInstanceStore struct{}
type fakeInstanceStore struct {
mtx sync.Mutex
recordedOps []interface{}
}
func (f *fakeInstanceStore) GetAlertInstance(_ *models.GetAlertInstanceQuery) error { return nil }
func (f *fakeInstanceStore) ListAlertInstances(_ *models.ListAlertInstancesQuery) error { return nil }
func (f *fakeInstanceStore) SaveAlertInstance(_ *models.SaveAlertInstanceCommand) error { return nil }
func (f *fakeInstanceStore) FetchOrgIds() ([]int64, error) { return []int64{}, nil }
func (f *fakeInstanceStore) DeleteAlertInstance(_ int64, _, _ string) error { return nil }
func (f *fakeInstanceStore) GetAlertInstance(q *models.GetAlertInstanceQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
return nil
}
func (f *fakeInstanceStore) ListAlertInstances(q *models.ListAlertInstancesQuery) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
return nil
}
func (f *fakeInstanceStore) SaveAlertInstance(q *models.SaveAlertInstanceCommand) error {
f.mtx.Lock()
defer f.mtx.Unlock()
f.recordedOps = append(f.recordedOps, *q)
return nil
}
func (f *fakeInstanceStore) FetchOrgIds() ([]int64, error) { return []int64{}, nil }
func (f *fakeInstanceStore) DeleteAlertInstance(_ int64, _, _ string) error { return nil }
func newFakeAdminConfigStore(t *testing.T) *fakeAdminConfigStore {
t.Helper()